YARN-7258. Add Node and Rack Hints to Opportunistic Scheduler. (Kartheek 
Muthyala via asuresh).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b733348d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b733348d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b733348d

Branch: refs/heads/HDFS-10467
Commit: b733348dde18a242e6c9074c512116a8baf1d281
Parents: 9288206
Author: Arun Suresh <asur...@apache.org>
Authored: Mon Oct 2 18:01:51 2017 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Thu Oct 5 09:58:04 2017 -0700

----------------------------------------------------------------------
 .../yarn/api/records/ResourceRequest.java       |  16 +
 .../api/impl/TestDistributedScheduling.java     | 649 ---------------
 .../TestOpportunisticContainerAllocation.java   | 784 -------------------
 ...TestOpportunisticContainerAllocationE2E.java | 784 +++++++++++++++++++
 .../server/api/protocolrecords/RemoteNode.java  |  35 +
 .../impl/pb/RemoteNodePBImpl.java               |  19 +
 .../OpportunisticContainerAllocator.java        | 333 ++++++--
 .../OpportunisticContainerContext.java          |  68 +-
 .../yarn_server_common_service_protos.proto     |   1 +
 .../TestOpportunisticContainerAllocator.java    | 599 ++++++++++++++
 ...pportunisticContainerAllocatorAMService.java |   8 +-
 ...pportunisticContainerAllocatorAMService.java |   2 +
 12 files changed, 1781 insertions(+), 1517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 21fa15f..beb3380 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -240,6 +240,22 @@ public abstract class ResourceRequest implements 
Comparable<ResourceRequest> {
     }
 
     /**
+     * Set the <code>executionTypeRequest</code> of the request with 'ensure
+     * execution type' flag set to true.
+     * @see ResourceRequest#setExecutionTypeRequest(
+     * ExecutionTypeRequest)
+     * @param executionType <code>executionType</code> of the request.
+     * @return {@link ResourceRequestBuilder}
+     */
+    @Public
+    @Evolving
+    public ResourceRequestBuilder executionType(ExecutionType executionType) {
+      resourceRequest.setExecutionTypeRequest(
+          ExecutionTypeRequest.newInstance(executionType, true));
+      return this;
+    }
+
+    /**
      * Set the <code>allocationRequestId</code> of the request.
      * @see ResourceRequest#setAllocationRequestId(long)
      * @param allocationRequestId

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
deleted file mode 100644
index 00f5e03..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ /dev/null
@@ -1,649 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ProfileCapability;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Validates End2End Distributed Scheduling flow which includes the AM
- * specifying OPPORTUNISTIC containers in its resource requests,
- * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
- * on the NM and the DistributedSchedulingProtocol used by the framework to 
talk
- * to the OpportunisticContainerAllocatorAMService running on the RM.
- */
-public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
-
-  private static final Log LOG =
-      LogFactory.getLog(TestDistributedScheduling.class);
-
-  protected MiniYARNCluster cluster;
-  protected YarnClient rmClient;
-  protected ApplicationMasterProtocol client;
-  protected Configuration conf;
-  protected Configuration yarnConf;
-  protected ApplicationAttemptId attemptId;
-  protected ApplicationId appId;
-
-  @Before
-  public void doBefore() throws Exception {
-    cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-
-    conf = new YarnConfiguration();
-    conf.setBoolean(YarnConfiguration.
-        OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-    conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
-        10);
-    cluster.init(conf);
-    cluster.start();
-    yarnConf = cluster.getConfig();
-
-    // the client has to connect to AMRMProxy
-    yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-    rmClient = YarnClient.createYarnClient();
-    rmClient.init(yarnConf);
-    rmClient.start();
-
-    // Submit application
-    attemptId = createApp(rmClient, cluster, conf);
-    appId = attemptId.getApplicationId();
-    client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-  }
-
-  @After
-  public void doAfter() throws Exception {
-    if (client != null) {
-      try {
-        client.finishApplicationMaster(FinishApplicationMasterRequest
-            .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
-        rmClient.killApplication(attemptId.getApplicationId());
-        attemptId = null;
-      } catch (Exception e) {
-      }
-    }
-    if (rmClient != null) {
-      try {
-        rmClient.stop();
-      } catch (Exception e) {
-      }
-    }
-    if (cluster != null) {
-      try {
-        cluster.stop();
-      } catch (Exception e) {
-      }
-    }
-  }
-
-
-  /**
-   * Validates if Allocate Requests containing only OPPORTUNISTIC container
-   * requests are satisfied instantly.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 60000)
-  public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
-    LOG.info("testDistributedSchedulingE2E - Register");
-
-    RegisterApplicationMasterResponse responseRegister =
-        client.registerApplicationMaster(RegisterApplicationMasterRequest
-            .newInstance(NetUtils.getHostname(), 1024, ""));
-
-    Assert.assertNotNull(responseRegister);
-    Assert.assertNotNull(responseRegister.getQueue());
-    Assert.assertNotNull(responseRegister.getApplicationACLs());
-    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-    Assert
-        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-    // Wait until the RM has been updated and verify
-    Map<ApplicationId, RMApp> rmApps =
-        cluster.getResourceManager().getRMContext().getRMApps();
-    boolean rmUpdated = false;
-    for (int i=0; i<10 && !rmUpdated; i++) {
-      sleep(100);
-      RMApp rmApp = rmApps.get(appId);
-      if (rmApp.getState() == RMAppState.RUNNING) {
-        rmUpdated = true;
-      }
-    }
-    RMApp rmApp = rmApps.get(appId);
-    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-    LOG.info("testDistributedSchedulingE2E - Allocate");
-
-    AllocateRequest request =
-        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
-    // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
-    // everything else
-    List<ResourceRequest> newAskList = new ArrayList<>();
-    for (ResourceRequest rr : request.getAskList()) {
-      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-        ResourceRequest newRR = ResourceRequest.newInstance(rr
-                .getPriority(), rr.getResourceName(),
-            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-            rr.getNodeLabelExpression(),
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true));
-        newAskList.add(newRR);
-      }
-    }
-    request.setAskList(newAskList);
-
-    AllocateResponse allocResponse = client.allocate(request);
-    Assert.assertNotNull(allocResponse);
-
-    // Ensure that all the requests are satisfied immediately
-    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-    // Verify that the allocated containers are OPPORTUNISTIC
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-          .newContainerTokenIdentifier(
-              allocatedContainer.getContainerToken());
-      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-          containerTokenIdentifier.getExecutionType());
-    }
-
-    // Check that the RM sees OPPORTUNISTIC containers
-    ResourceScheduler scheduler = cluster.getResourceManager()
-        .getResourceScheduler();
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerId containerId = allocatedContainer.getId();
-      RMContainer rmContainer = scheduler.getRMContainer(containerId);
-      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-          rmContainer.getExecutionType());
-    }
-
-    LOG.info("testDistributedSchedulingE2E - Finish");
-  }
-
-  /**
-   * Validates if Allocate Requests containing both GUARANTEED and 
OPPORTUNISTIC
-   * container requests works as expected.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 60000)
-  public void testMixedExecutionTypeRequestE2E() throws Exception {
-    LOG.info("testDistributedSchedulingE2E - Register");
-
-    RegisterApplicationMasterResponse responseRegister =
-        client.registerApplicationMaster(RegisterApplicationMasterRequest
-            .newInstance(NetUtils.getHostname(), 1024, ""));
-
-    Assert.assertNotNull(responseRegister);
-    Assert.assertNotNull(responseRegister.getQueue());
-    Assert.assertNotNull(responseRegister.getApplicationACLs());
-    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-    Assert
-        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-    RMApp rmApp =
-        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
-    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-    LOG.info("testDistributedSchedulingE2E - Allocate");
-
-    AllocateRequest request =
-        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-    List<ResourceRequest> askList = request.getAskList();
-    List<ResourceRequest> newAskList = new ArrayList<>(askList);
-
-    // Duplicate all ANY requests marking them as opportunistic
-    for (ResourceRequest rr : askList) {
-      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-        ResourceRequest newRR = ResourceRequest.newInstance(rr
-                .getPriority(), rr.getResourceName(),
-            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-            rr.getNodeLabelExpression(),
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true));
-        newAskList.add(newRR);
-      }
-    }
-    request.setAskList(newAskList);
-
-    AllocateResponse allocResponse = client.allocate(request);
-    Assert.assertNotNull(allocResponse);
-
-    // Ensure that all the requests are satisfied immediately
-    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-    // Verify that the allocated containers are OPPORTUNISTIC
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-          .newContainerTokenIdentifier(
-              allocatedContainer.getContainerToken());
-      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-          containerTokenIdentifier.getExecutionType());
-    }
-
-    request.setAskList(new ArrayList<ResourceRequest>());
-    request.setResponseId(request.getResponseId() + 1);
-
-    Thread.sleep(1000);
-
-    // RM should allocate GUARANTEED containers within 2 calls to allocate()
-    allocResponse = client.allocate(request);
-    Assert.assertNotNull(allocResponse);
-    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-    // Verify that the allocated containers are GUARANTEED
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-          .newContainerTokenIdentifier(
-              allocatedContainer.getContainerToken());
-      Assert.assertEquals(ExecutionType.GUARANTEED,
-          containerTokenIdentifier.getExecutionType());
-    }
-
-    LOG.info("testDistributedSchedulingE2E - Finish");
-  }
-
-  /**
-   * Validates if AMRMClient can be used with Distributed Scheduling turned on.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 120000)
-  @SuppressWarnings("unchecked")
-  public void testAMRMClient() throws Exception {
-    AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
-    try {
-      Priority priority = Priority.newInstance(1);
-      Priority priority2 = Priority.newInstance(2);
-      Resource capability = Resource.newInstance(1024, 1);
-
-      List<NodeReport> nodeReports = 
rmClient.getNodeReports(NodeState.RUNNING);
-      String node = nodeReports.get(0).getNodeId().getHost();
-      String rack = nodeReports.get(0).getRackName();
-      String[] nodes = new String[]{node};
-      String[] racks = new String[]{rack};
-
-      // start am rm client
-      amClient = new AMRMClientImpl(client);
-      amClient.init(yarnConf);
-      amClient.start();
-      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
-
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-
-      RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
-          amClient.getTable(0);
-      ProfileCapability profileCapability =
-          ProfileCapability.newInstance(capability);
-
-      int containersRequestedNode = remoteRequestsTable.get(priority,
-          node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
-          .getNumContainers();
-      int containersRequestedRack = remoteRequestsTable.get(priority,
-          rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
-          .getNumContainers();
-      int containersRequestedAny = remoteRequestsTable.get(priority,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
-          .remoteRequest.getNumContainers();
-      int oppContainersRequestedAny =
-          remoteRequestsTable.get(priority2, ResourceRequest.ANY,
-              ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
-              .getNumContainers();
-
-      assertEquals(2, containersRequestedNode);
-      assertEquals(2, containersRequestedRack);
-      assertEquals(2, containersRequestedAny);
-      assertEquals(1, oppContainersRequestedAny);
-
-      assertEquals(4, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      // RM should allocate container within 2 calls to allocate()
-      int allocatedContainerCount = 0;
-      int iterationsLeft = 10;
-      Set<ContainerId> releases = new TreeSet<>();
-
-      amClient.getNMTokenCache().clearCache();
-      Assert.assertEquals(0,
-          amClient.getNMTokenCache().numberOfTokensInCache());
-      HashMap<String, Token> receivedNMTokens = new HashMap<>();
-
-      while (allocatedContainerCount <
-          (containersRequestedAny + oppContainersRequestedAny)
-          && iterationsLeft-- > 0) {
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        assertEquals(0, amClient.ask.size());
-        assertEquals(0, amClient.release.size());
-
-        allocatedContainerCount += allocResponse.getAllocatedContainers()
-            .size();
-        for (Container container : allocResponse.getAllocatedContainers()) {
-          ContainerId rejectContainerId = container.getId();
-          releases.add(rejectContainerId);
-        }
-
-        for (NMToken token : allocResponse.getNMTokens()) {
-          String nodeID = token.getNodeId().toString();
-          receivedNMTokens.put(nodeID, token.getToken());
-        }
-
-        if (allocatedContainerCount < containersRequestedAny) {
-          // sleep to let NM's heartbeat to RM and trigger allocations
-          sleep(100);
-        }
-      }
-
-      assertEquals(allocatedContainerCount,
-          containersRequestedAny + oppContainersRequestedAny);
-      for (ContainerId rejectContainerId : releases) {
-        amClient.releaseAssignedContainer(rejectContainerId);
-      }
-      assertEquals(3, amClient.release.size());
-      assertEquals(0, amClient.ask.size());
-
-      // need to tell the AMRMClient that we dont need these resources anymore
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-      assertEquals(4, amClient.ask.size());
-
-      // test RPC exception handling
-      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
-          nodes, racks, priority));
-      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
-          nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-
-      final AMRMClient amc = amClient;
-      ApplicationMasterProtocol realRM = amClient.rmClient;
-      try {
-        ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
-            .class);
-        when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
-            new Answer<AllocateResponse>() {
-              public AllocateResponse answer(InvocationOnMock invocation)
-                  throws Exception {
-                amc.removeContainerRequest(
-                    new AMRMClient.ContainerRequest(capability, nodes,
-                        racks, priority));
-                amc.removeContainerRequest(
-                    new AMRMClient.ContainerRequest(capability, nodes, racks,
-                        priority));
-                amc.removeContainerRequest(
-                    new AMRMClient.ContainerRequest(capability, null, null,
-                        priority2, 0, true, null,
-                        ExecutionTypeRequest.newInstance(
-                            ExecutionType.OPPORTUNISTIC, true)));
-                throw new Exception();
-              }
-            });
-        amClient.rmClient = mockRM;
-        amClient.allocate(0.1f);
-      } catch (Exception ioe) {
-      } finally {
-        amClient.rmClient = realRM;
-      }
-
-      assertEquals(3, amClient.release.size());
-      assertEquals(6, amClient.ask.size());
-
-      iterationsLeft = 3;
-      // do a few iterations to ensure RM is not going send new containers
-      while (iterationsLeft-- > 0) {
-        // inform RM of rejection
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        // RM did not send new containers because AM does not need any
-        assertEquals(0, allocResponse.getAllocatedContainers().size());
-        if (allocResponse.getCompletedContainersStatuses().size() > 0) {
-          for (ContainerStatus cStatus : allocResponse
-              .getCompletedContainersStatuses()) {
-            if (releases.contains(cStatus.getContainerId())) {
-              assertEquals(cStatus.getState(), ContainerState.COMPLETE);
-              assertEquals(-100, cStatus.getExitStatus());
-              releases.remove(cStatus.getContainerId());
-            }
-          }
-        }
-        if (iterationsLeft > 0) {
-          // sleep to make sure NM's heartbeat
-          sleep(100);
-        }
-      }
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-          null, null);
-
-    } finally {
-      if (amClient != null && amClient.getServiceState() == Service.STATE
-          .STARTED) {
-        amClient.stop();
-      }
-    }
-  }
-
-  /**
-   * Check if an AM can ask for opportunistic containers and get them.
-   * @throws Exception
-   */
-  @Test
-  public void testAMOpportunistic() throws Exception {
-    // Basic container to request
-    Resource capability = Resource.newInstance(1024, 1);
-    Priority priority = Priority.newInstance(1);
-
-    // Get the cluster topology
-    List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String rack = nodeReports.get(0).getRackName();
-    String[] nodes = new String[]{node};
-    String[] racks = new String[]{rack};
-
-    // Create an AM to request resources
-    AMRMClient<AMRMClient.ContainerRequest> amClient = null;
-    try {
-      amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
-      amClient.init(yarnConf);
-      amClient.start();
-      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
-
-      // AM requests an opportunistic container
-      ExecutionTypeRequest execTypeRequest =
-          ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
-      ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
-          capability, nodes, racks, priority, 0, true, null, execTypeRequest);
-      amClient.addContainerRequest(containerRequest);
-
-      // Wait until the container is allocated
-      ContainerId opportunisticContainerId = null;
-      for (int i=0; i<10 && opportunisticContainerId == null; i++) {
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        List<Container> allocatedContainers =
-            allocResponse.getAllocatedContainers();
-        for (Container allocatedContainer : allocatedContainers) {
-          // Check that this is the container we required
-          assertEquals(ExecutionType.OPPORTUNISTIC,
-              allocatedContainer.getExecutionType());
-          opportunisticContainerId = allocatedContainer.getId();
-        }
-        sleep(100);
-      }
-      assertNotNull(opportunisticContainerId);
-
-      // The RM sees the container as OPPORTUNISTIC
-      ResourceScheduler scheduler = cluster.getResourceManager()
-          .getResourceScheduler();
-      RMContainer rmContainer = scheduler.getRMContainer(
-          opportunisticContainerId);
-      assertEquals(ExecutionType.OPPORTUNISTIC,
-          rmContainer.getExecutionType());
-
-      // Release the opportunistic container
-      amClient.releaseAssignedContainer(opportunisticContainerId);
-      // Wait for the release container to appear
-      boolean released = false;
-      for (int i=0; i<10 && !released; i++) {
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        List<ContainerStatus> completedContainers =
-            allocResponse.getCompletedContainersStatuses();
-        for (ContainerStatus completedContainer : completedContainers) {
-          ContainerId completedContainerId =
-              completedContainer.getContainerId();
-          assertEquals(completedContainerId, opportunisticContainerId);
-          released = true;
-        }
-        if (!released) {
-          sleep(100);
-        }
-      }
-      assertTrue(released);
-
-      // The RM shouldn't see the container anymore
-      rmContainer = scheduler.getRMContainer(opportunisticContainerId);
-      assertNull(rmContainer);
-
-      // Clean the AM
-      amClient.unregisterApplicationMaster(
-          FinalApplicationStatus.SUCCEEDED, null, null);
-    } finally {
-      if (amClient != null &&
-          amClient.getServiceState() == Service.STATE.STARTED) {
-        amClient.close();
-      }
-    }
-  }
-
-  private void sleep(int sleepTime) {
-    try {
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
deleted file mode 100644
index 12c32fc..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ProfileCapability;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Class that tests the allocation of OPPORTUNISTIC containers through the
- * centralized ResourceManager.
- */
-public class TestOpportunisticContainerAllocation {
-  private static Configuration conf = null;
-  private static MiniYARNCluster yarnCluster = null;
-  private static YarnClient yarnClient = null;
-  private static List<NodeReport> nodeReports = null;
-  private static int nodeCount = 3;
-
-  private static final int ROLLING_INTERVAL_SEC = 13;
-  private static final long AM_EXPIRE_MS = 4000;
-
-  private static Resource capability;
-  private static ProfileCapability profileCapability;
-  private static Priority priority;
-  private static Priority priority2;
-  private static Priority priority3;
-  private static Priority priority4;
-  private static String node;
-  private static String rack;
-  private static String[] nodes;
-  private static String[] racks;
-  private final static int DEFAULT_ITERATION = 3;
-
-  // Per test..
-  private ApplicationAttemptId attemptId = null;
-  private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
-  private long availMB;
-  private int availVCores;
-  private long allocMB;
-  private int allocVCores;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    // start minicluster
-    conf = new YarnConfiguration();
-    conf.setLong(
-        YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
-        ROLLING_INTERVAL_SEC);
-    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
-    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
-    // set the minimum allocation so that resource decrease can go under 1024
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
-    conf.setBoolean(
-        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
-    conf.setInt(
-        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
-    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
-    yarnCluster =
-        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
-    yarnCluster.init(conf);
-    yarnCluster.start();
-
-    // start rm client
-    yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(conf);
-    yarnClient.start();
-
-    // get node info
-    nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
-
-    priority = Priority.newInstance(1);
-    priority2 = Priority.newInstance(2);
-    priority3 = Priority.newInstance(3);
-    priority4 = Priority.newInstance(4);
-    capability = Resource.newInstance(512, 1);
-    profileCapability = ProfileCapability.newInstance(capability);
-
-    node = nodeReports.get(0).getNodeId().getHost();
-    rack = nodeReports.get(0).getRackName();
-    nodes = new String[]{node};
-    racks = new String[]{rack};
-  }
-
-  @Before
-  public void startApp() throws Exception {
-    // submit new app
-    ApplicationSubmissionContext appContext =
-        yarnClient.createApplication().getApplicationSubmissionContext();
-    ApplicationId appId = appContext.getApplicationId();
-    // set the application name
-    appContext.setApplicationName("Test");
-    // Set the priority for the application master
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(0);
-    appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = 
BuilderUtils.newContainerLaunchContext(
-        Collections.<String, LocalResource>emptyMap(),
-        new HashMap<String, String>(), Arrays.asList("sleep", "100"),
-        new HashMap<String, ByteBuffer>(), null,
-        new HashMap<ApplicationAccessType, String>());
-    appContext.setAMContainerSpec(amContainer);
-    appContext.setResource(Resource.newInstance(1024, 1));
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest =
-        Records.newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
-    yarnClient.submitApplication(appContext);
-
-    // wait for app to start
-    RMAppAttempt appAttempt = null;
-    while (true) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() ==
-          YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt = 
yarnCluster.getResourceManager().getRMContext().getRMApps()
-            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-    }
-    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-        
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-
-    // emulate RM setup of AMRM token in credentials by adding the token
-    // *before* setting the token service
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-    appAttempt.getAMRMToken()
-        .setService(ClientRMProxy.getAMRMTokenService(conf));
-
-    // start am rm client
-    amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient
-        .createAMRMClient();
-
-    //setting an instance NMTokenCache
-    amClient.setNMTokenCache(new NMTokenCache());
-    //asserting we are not using the singleton instance cache
-    Assert.assertNotSame(NMTokenCache.getSingleton(),
-        amClient.getNMTokenCache());
-
-    amClient.init(conf);
-    amClient.start();
-
-    amClient.registerApplicationMaster("Host", 10000, "");
-  }
-
-  @After
-  public void cancelApp() throws YarnException, IOException {
-    try {
-      amClient
-          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
-              null);
-    } finally {
-      if (amClient != null &&
-          amClient.getServiceState() == Service.STATE.STARTED) {
-        amClient.stop();
-      }
-    }
-    yarnClient.killApplication(attemptId.getApplicationId());
-    attemptId = null;
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    if (yarnClient != null &&
-        yarnClient.getServiceState() == Service.STATE.STARTED) {
-      yarnClient.stop();
-    }
-    if (yarnCluster != null &&
-        yarnCluster.getServiceState() == Service.STATE.STARTED) {
-      yarnCluster.stop();
-    }
-  }
-
-  @Test(timeout = 60000)
-  public void testPromotionFromAcquired() throws YarnException, IOException {
-    // setup container request
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-
-    int oppContainersRequestedAny =
-        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
-            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
-            .getNumContainers();
-
-    assertEquals(1, oppContainersRequestedAny);
-
-    assertEquals(1, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    // RM should allocate container within 2 calls to allocate()
-    int allocatedContainerCount = 0;
-    Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
-    int iterationsLeft = 50;
-
-    amClient.getNMTokenCache().clearCache();
-    Assert.assertEquals(0,
-        amClient.getNMTokenCache().numberOfTokensInCache());
-    HashMap<String, Token> receivedNMTokens = new HashMap<>();
-
-    updateMetrics("Before Opp Allocation");
-
-    while (allocatedContainerCount < oppContainersRequestedAny
-        && iterationsLeft-- > 0) {
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      allocatedContainerCount +=
-          allocResponse.getAllocatedContainers().size();
-      for (Container container : allocResponse.getAllocatedContainers()) {
-        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-          allocatedOpportContainers.put(container.getId(), container);
-          removeCR(container);
-        }
-      }
-
-      for (NMToken token : allocResponse.getNMTokens()) {
-        String nodeID = token.getNodeId().toString();
-        receivedNMTokens.put(nodeID, token.getToken());
-      }
-
-      if (allocatedContainerCount < oppContainersRequestedAny) {
-        // sleep to let NM's heartbeat to RM and trigger allocations
-        sleep(100);
-      }
-    }
-
-    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
-    assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
-
-    updateMetrics("After Opp Allocation / Before Promotion");
-
-    try {
-      Container c = allocatedOpportContainers.values().iterator().next();
-      amClient.requestContainerUpdate(
-          c, UpdateContainerRequest.newInstance(c.getVersion(),
-              c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
-              null, ExecutionType.OPPORTUNISTIC));
-      Assert.fail("Should throw Exception..");
-    } catch (IllegalArgumentException e) {
-      System.out.println("## " + e.getMessage());
-      Assert.assertTrue(e.getMessage().contains(
-          "target should be GUARANTEED and original should be OPPORTUNISTIC"));
-    }
-
-    Container c = allocatedOpportContainers.values().iterator().next();
-    amClient.requestContainerUpdate(
-        c, UpdateContainerRequest.newInstance(c.getVersion(),
-            c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
-            null, ExecutionType.GUARANTEED));
-    iterationsLeft = 120;
-    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
-    // do a few iterations to ensure RM is not going to send new containers
-    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
-      // inform RM of rejection
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      // RM did not send new containers because AM does not need any
-      if (allocResponse.getUpdatedContainers() != null) {
-        for (UpdatedContainer updatedContainer : allocResponse
-            .getUpdatedContainers()) {
-          System.out.println("Got update..");
-          updatedContainers.put(updatedContainer.getContainer().getId(),
-              updatedContainer);
-        }
-      }
-      if (iterationsLeft > 0) {
-        // sleep to make sure NM's heartbeat
-        sleep(100);
-      }
-    }
-
-    updateMetrics("After Promotion");
-
-    assertEquals(1, updatedContainers.size());
-    for (ContainerId cId : allocatedOpportContainers.keySet()) {
-      Container orig = allocatedOpportContainers.get(cId);
-      UpdatedContainer updatedContainer = updatedContainers.get(cId);
-      assertNotNull(updatedContainer);
-      assertEquals(ExecutionType.GUARANTEED,
-          updatedContainer.getContainer().getExecutionType());
-      assertEquals(orig.getResource(),
-          updatedContainer.getContainer().getResource());
-      assertEquals(orig.getNodeId(),
-          updatedContainer.getContainer().getNodeId());
-      assertEquals(orig.getVersion() + 1,
-          updatedContainer.getContainer().getVersion());
-    }
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-    amClient.ask.clear();
-  }
-
-  @Test(timeout = 60000)
-  public void testDemotionFromAcquired() throws YarnException, IOException {
-    // setup container request
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority3));
-
-    int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
-        .remoteRequest.getNumContainers();
-
-    assertEquals(1, guarContainersRequestedAny);
-
-    assertEquals(1, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    // RM should allocate container within 2 calls to allocate()
-    int allocatedContainerCount = 0;
-    Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>();
-    int iterationsLeft = 50;
-
-    amClient.getNMTokenCache().clearCache();
-    Assert.assertEquals(0,
-        amClient.getNMTokenCache().numberOfTokensInCache());
-    HashMap<String, Token> receivedNMTokens = new HashMap<>();
-
-    updateMetrics("Before Guar Allocation");
-
-    while (allocatedContainerCount < guarContainersRequestedAny
-        && iterationsLeft-- > 0) {
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      allocatedContainerCount +=
-          allocResponse.getAllocatedContainers().size();
-      for (Container container : allocResponse.getAllocatedContainers()) {
-        if (container.getExecutionType() == ExecutionType.GUARANTEED) {
-          allocatedGuarContainers.put(container.getId(), container);
-          removeCR(container);
-        }
-      }
-
-      for (NMToken token : allocResponse.getNMTokens()) {
-        String nodeID = token.getNodeId().toString();
-        receivedNMTokens.put(nodeID, token.getToken());
-      }
-
-      if (allocatedContainerCount < guarContainersRequestedAny) {
-        // sleep to let NM's heartbeat to RM and trigger allocations
-        sleep(100);
-      }
-    }
-
-    assertEquals(guarContainersRequestedAny, allocatedContainerCount);
-    assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
-
-    updateMetrics("After Guar Allocation / Before Demotion");
-
-    try {
-      Container c = allocatedGuarContainers.values().iterator().next();
-      amClient.requestContainerUpdate(
-          c, UpdateContainerRequest.newInstance(c.getVersion(),
-              c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
-              null, ExecutionType.GUARANTEED));
-      Assert.fail("Should throw Exception..");
-    } catch (IllegalArgumentException e) {
-      System.out.println("## " + e.getMessage());
-      Assert.assertTrue(e.getMessage().contains(
-          "target should be OPPORTUNISTIC and original should be GUARANTEED"));
-    }
-
-    Container c = allocatedGuarContainers.values().iterator().next();
-    amClient.requestContainerUpdate(
-        c, UpdateContainerRequest.newInstance(c.getVersion(),
-            c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
-            null, ExecutionType.OPPORTUNISTIC));
-    iterationsLeft = 120;
-    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
-    // do a few iterations to ensure RM is not going to send new containers
-    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
-      // inform RM of rejection
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      // RM did not send new containers because AM does not need any
-      if (allocResponse.getUpdatedContainers() != null) {
-        for (UpdatedContainer updatedContainer : allocResponse
-            .getUpdatedContainers()) {
-          System.out.println("Got update..");
-          updatedContainers.put(updatedContainer.getContainer().getId(),
-              updatedContainer);
-        }
-      }
-      if (iterationsLeft > 0) {
-        // sleep to make sure NM's heartbeat
-        sleep(100);
-      }
-    }
-
-    updateMetrics("After Demotion");
-
-    assertEquals(1, updatedContainers.size());
-    for (ContainerId cId : allocatedGuarContainers.keySet()) {
-      Container orig = allocatedGuarContainers.get(cId);
-      UpdatedContainer updatedContainer = updatedContainers.get(cId);
-      assertNotNull(updatedContainer);
-      assertEquals(ExecutionType.OPPORTUNISTIC,
-          updatedContainer.getContainer().getExecutionType());
-      assertEquals(orig.getResource(),
-          updatedContainer.getContainer().getResource());
-      assertEquals(orig.getNodeId(),
-          updatedContainer.getContainer().getNodeId());
-      assertEquals(orig.getVersion() + 1,
-          updatedContainer.getContainer().getVersion());
-    }
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-    amClient.ask.clear();
-  }
-
-  @Test(timeout = 60000)
-  public void testMixedAllocationAndRelease() throws YarnException,
-      IOException {
-    // setup container request
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-
-    int containersRequestedNode = amClient.getTable(0).get(priority,
-        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
-        .getNumContainers();
-    int containersRequestedRack = amClient.getTable(0).get(priority,
-        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
-        .getNumContainers();
-    int containersRequestedAny = amClient.getTable(0).get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
-        .remoteRequest.getNumContainers();
-    int oppContainersRequestedAny =
-        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
-            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
-            .getNumContainers();
-
-    assertEquals(4, containersRequestedNode);
-    assertEquals(4, containersRequestedRack);
-    assertEquals(4, containersRequestedAny);
-    assertEquals(2, oppContainersRequestedAny);
-
-    assertEquals(4, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    amClient.removeContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.removeContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.removeContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-
-    containersRequestedNode = amClient.getTable(0).get(priority,
-        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
-        .getNumContainers();
-    containersRequestedRack = amClient.getTable(0).get(priority,
-        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
-        .getNumContainers();
-    containersRequestedAny = amClient.getTable(0).get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
-        .remoteRequest.getNumContainers();
-    oppContainersRequestedAny =
-        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
-            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
-            .getNumContainers();
-
-    assertEquals(2, containersRequestedNode);
-    assertEquals(2, containersRequestedRack);
-    assertEquals(2, containersRequestedAny);
-    assertEquals(1, oppContainersRequestedAny);
-
-    assertEquals(4, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    // RM should allocate container within 2 calls to allocate()
-    int allocatedContainerCount = 0;
-    int allocatedOpportContainerCount = 0;
-    int iterationsLeft = 50;
-    Set<ContainerId> releases = new TreeSet<>();
-
-    amClient.getNMTokenCache().clearCache();
-    Assert.assertEquals(0,
-        amClient.getNMTokenCache().numberOfTokensInCache());
-    HashMap<String, Token> receivedNMTokens = new HashMap<>();
-
-    while (allocatedContainerCount <
-        containersRequestedAny + oppContainersRequestedAny
-        && iterationsLeft-- > 0) {
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      allocatedContainerCount +=
-          allocResponse.getAllocatedContainers().size();
-      for (Container container : allocResponse.getAllocatedContainers()) {
-        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-          allocatedOpportContainerCount++;
-        }
-        ContainerId rejectContainerId = container.getId();
-        releases.add(rejectContainerId);
-      }
-
-      for (NMToken token : allocResponse.getNMTokens()) {
-        String nodeID = token.getNodeId().toString();
-        receivedNMTokens.put(nodeID, token.getToken());
-      }
-
-      if (allocatedContainerCount < containersRequestedAny) {
-        // sleep to let NM's heartbeat to RM and trigger allocations
-        sleep(100);
-      }
-    }
-
-    assertEquals(containersRequestedAny + oppContainersRequestedAny,
-        allocatedContainerCount);
-    assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
-    for (ContainerId rejectContainerId : releases) {
-      amClient.releaseAssignedContainer(rejectContainerId);
-    }
-    assertEquals(3, amClient.release.size());
-    assertEquals(0, amClient.ask.size());
-
-    // need to tell the AMRMClient that we don't need these resources anymore
-    amClient.removeContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.removeContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-    amClient.removeContainerRequest(
-        new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-    assertEquals(4, amClient.ask.size());
-
-    iterationsLeft = 3;
-    // do a few iterations to ensure RM is not going to send new containers
-    while (iterationsLeft-- > 0) {
-      // inform RM of rejection
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      // RM did not send new containers because AM does not need any
-      assertEquals(0, allocResponse.getAllocatedContainers().size());
-      if (allocResponse.getCompletedContainersStatuses().size() > 0) {
-        for (ContainerStatus cStatus : allocResponse
-            .getCompletedContainersStatuses()) {
-          if (releases.contains(cStatus.getContainerId())) {
-            assertEquals(cStatus.getState(), ContainerState.COMPLETE);
-            assertEquals(-100, cStatus.getExitStatus());
-            releases.remove(cStatus.getContainerId());
-          }
-        }
-      }
-      if (iterationsLeft > 0) {
-        // sleep to make sure NM's heartbeat
-        sleep(100);
-      }
-    }
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-  }
-
-  /**
-   * Tests allocation with requests comprising only opportunistic containers.
-   */
-  @Test(timeout = 60000)
-  public void testOpportunisticAllocation() throws YarnException, IOException {
-    // setup container request
-    assertEquals(0, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-    amClient.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
-            true, null,
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true)));
-
-    int oppContainersRequestedAny = amClient.getTable(0)
-        .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
-            profileCapability).remoteRequest.getNumContainers();
-
-    assertEquals(2, oppContainersRequestedAny);
-
-    assertEquals(1, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
-
-    // RM should allocate container within 2 calls to allocate()
-    int allocatedContainerCount = 0;
-    int iterationsLeft = 10;
-    Set<ContainerId> releases = new TreeSet<>();
-
-    amClient.getNMTokenCache().clearCache();
-    Assert.assertEquals(0,
-        amClient.getNMTokenCache().numberOfTokensInCache());
-    HashMap<String, Token> receivedNMTokens = new HashMap<>();
-
-    while (allocatedContainerCount < oppContainersRequestedAny
-        && iterationsLeft-- > 0) {
-      AllocateResponse allocResponse = amClient.allocate(0.1f);
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      for (Container container : allocResponse.getAllocatedContainers()) {
-        allocatedContainerCount++;
-        ContainerId rejectContainerId = container.getId();
-        releases.add(rejectContainerId);
-      }
-
-      for (NMToken token : allocResponse.getNMTokens()) {
-        String nodeID = token.getNodeId().toString();
-        receivedNMTokens.put(nodeID, token.getToken());
-      }
-
-      if (allocatedContainerCount < oppContainersRequestedAny) {
-        // sleep to let NM's heartbeat to RM and trigger allocations
-        sleep(100);
-      }
-    }
-
-    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
-    assertEquals(1, receivedNMTokens.values().size());
-  }
-
-  private void removeCR(Container container) {
-    List<? extends Collection<AMRMClient.ContainerRequest>>
-        matchingRequests = amClient.getMatchingRequests(container
-            .getPriority(),
-        ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
-        container.getResource());
-    Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>();
-    for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) {
-      for (AMRMClient.ContainerRequest cr : rc) {
-        toRemove.add(cr);
-      }
-    }
-    for (AMRMClient.ContainerRequest cr : toRemove) {
-      amClient.removeContainerRequest(cr);
-    }
-  }
-
-  private void updateMetrics(String msg) {
-    AbstractYarnScheduler scheduler =
-        (AbstractYarnScheduler)yarnCluster.getResourceManager()
-            .getResourceScheduler();
-    availMB = scheduler.getRootQueueMetrics().getAvailableMB();
-    availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
-    allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
-    allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
-    System.out.println("## METRICS (" + msg + ")==>");
-    System.out.println(" : availMB=" + availMB + ", " +
-        "availVCores=" +availVCores + ", " +
-        "allocMB=" + allocMB + ", " +
-        "allocVCores=" + allocVCores + ", ");
-    System.out.println("<== ##");
-  }
-
-  private void sleep(int sleepTime) {
-    try {
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to