YARN-5486. Update OpportunisticContainerAllocatorAMService::allocate method to 
handle OPPORTUNISTIC container requests. (Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/10be4598
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/10be4598
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/10be4598

Branch: refs/heads/HDFS-10467
Commit: 10be45986cdf86a89055065b752959bd6369d54f
Parents: 1e0ea27
Author: Arun Suresh <asur...@apache.org>
Authored: Thu Sep 29 15:11:41 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Thu Sep 29 15:11:41 2016 -0700

----------------------------------------------------------------------
 .../TestOpportunisticContainerAllocation.java   | 398 +++++++++++++++++++
 .../OpportunisticContainerAllocator.java        |  22 +-
 .../OpportunisticContainerContext.java          |  49 ++-
 .../yarn/server/nodemanager/NodeManager.java    |   3 +-
 .../amrmproxy/DefaultRequestInterceptor.java    |   4 +-
 .../scheduler/DistributedScheduler.java         |  59 +--
 ...pportunisticContainerAllocatorAMService.java | 215 ++++++----
 .../server/resourcemanager/ResourceManager.java |  12 +-
 .../scheduler/SchedulerApplicationAttempt.java  |  58 ++-
 .../distributed/NodeQueueLoadMonitor.java       |  45 ++-
 ...pportunisticContainerAllocatorAMService.java |  10 +-
 11 files changed, 707 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
new file mode 100644
index 0000000..b9b4b02
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Class that tests the allocation of OPPORTUNISTIC containers through the
+ * centralized ResourceManager.
+ */
+public class TestOpportunisticContainerAllocation {
+  private static Configuration conf = null;
+  private static MiniYARNCluster yarnCluster = null;
+  private static YarnClient yarnClient = null;
+  private static List<NodeReport> nodeReports = null;
+  private static ApplicationAttemptId attemptId = null;
+  private static int nodeCount = 3;
+
+  private static final int ROLLING_INTERVAL_SEC = 13;
+  private static final long AM_EXPIRE_MS = 4000;
+
+  private static Resource capability;
+  private static Priority priority;
+  private static Priority priority2;
+  private static String node;
+  private static String rack;
+  private static String[] nodes;
+  private static String[] racks;
+  private final static int DEFAULT_ITERATION = 3;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // start minicluster
+    conf = new YarnConfiguration();
+    conf.setLong(
+        YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+        ROLLING_INTERVAL_SEC);
+    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+    // set the minimum allocation so that resource decrease can go under 1024
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+    yarnCluster =
+        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    // start rm client
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+
+    priority = Priority.newInstance(1);
+    priority2 = Priority.newInstance(2);
+    capability = Resource.newInstance(1024, 1);
+
+    node = nodeReports.get(0).getNodeId().getHost();
+    rack = nodeReports.get(0).getRackName();
+    nodes = new String[]{node};
+    racks = new String[]{rack};
+  }
+
+  @Before
+  public void startApp() throws Exception {
+    // submit new app
+    ApplicationSubmissionContext appContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = 
BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource>emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+        new HashMap<String, ByteBuffer>(), null,
+        new HashMap<ApplicationAccessType, String>());
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(Resource.newInstance(1024, 1));
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest =
+        Records.newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    RMAppAttempt appAttempt = null;
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() ==
+          YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        appAttempt = 
yarnCluster.getResourceManager().getRMContext().getRMApps()
+            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+        while (true) {
+          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+            break;
+          }
+        }
+        break;
+      }
+    }
+    // Just dig into the ResourceManager and get the AMRMToken just for the 
sake
+    // of testing.
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+    // emulate RM setup of AMRM token in credentials by adding the token
+    // *before* setting the token service
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    appAttempt.getAMRMToken()
+        .setService(ClientRMProxy.getAMRMTokenService(conf));
+  }
+
+  @After
+  public void cancelApp() throws YarnException, IOException {
+    yarnClient.killApplication(attemptId.getApplicationId());
+    attemptId = null;
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (yarnClient != null &&
+        yarnClient.getServiceState() == Service.STATE.STARTED) {
+      yarnClient.stop();
+    }
+    if (yarnCluster != null &&
+        yarnCluster.getServiceState() == Service.STATE.STARTED) {
+      yarnCluster.stop();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testAMRMClient() throws YarnException, IOException {
+    AMRMClient<AMRMClient.ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+
+      //setting an instance NMTokenCache
+      amClient.setNMTokenCache(new NMTokenCache());
+      //asserting we are not using the singleton instance cache
+      Assert.assertNotSame(NMTokenCache.getSingleton(),
+          amClient.getNMTokenCache());
+
+      amClient.init(conf);
+      amClient.start();
+
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
+
+      amClient
+          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+              null);
+
+    } finally {
+      if (amClient != null &&
+          amClient.getServiceState() == Service.STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+
+  private void testAllocation(
+      final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
+      throws YarnException, IOException {
+    // setup container request
+
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    int containersRequestedNode = amClient.getTable(0).get(priority,
+        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedRack = amClient.getTable(0).get(priority,
+        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedAny = amClient.getTable(0).get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(2, containersRequestedNode);
+    assertEquals(2, containersRequestedRack);
+    assertEquals(2, containersRequestedAny);
+    assertEquals(1, oppContainersRequestedAny);
+
+    assertEquals(4, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int allocatedOpportContainerCount = 0;
+    int iterationsLeft = 10;
+    Set<ContainerId> releases = new TreeSet<>();
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    while (allocatedContainerCount <
+        containersRequestedAny + oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount += allocResponse.getAllocatedContainers()
+          .size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+          allocatedOpportContainerCount++;
+        }
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(allocatedContainerCount,
+        containersRequestedAny + oppContainersRequestedAny);
+    assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
+    for (ContainerId rejectContainerId : releases) {
+      amClient.releaseAssignedContainer(rejectContainerId);
+    }
+    assertEquals(3, amClient.release.size());
+    assertEquals(0, amClient.ask.size());
+
+    // need to tell the AMRMClient that we don't need these resources anymore
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    assertEquals(4, amClient.ask.size());
+
+    iterationsLeft = 3;
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      assertEquals(0, allocResponse.getAllocatedContainers().size());
+      if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+        for (ContainerStatus cStatus : allocResponse
+            .getCompletedContainersStatuses()) {
+          if (releases.contains(cStatus.getContainerId())) {
+            assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+            assertEquals(-100, cStatus.getExitStatus());
+            releases.remove(cStatus.getContainerId());
+          }
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+  }
+
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 9b2fd38..9c158e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -145,15 +146,6 @@ public class OpportunisticContainerAllocator {
     }
 
     /**
-     * Sets the underlying Atomic Long. To be used when implementation needs to
-     * share the underlying AtomicLong of an existing counter.
-     * @param counter AtomicLong
-     */
-    public void setContainerIdCounter(AtomicLong counter) {
-      this.containerIdCounter = counter;
-    }
-
-    /**
      * Generates a new long value. Default implementation increments the
      * underlying AtomicLong. Sub classes are encouraged to over-ride this
      * behaviour.
@@ -213,6 +205,10 @@ public class OpportunisticContainerAllocator {
     PartitionedResourceRequests partitionedAsks =
         partitionAskList(request.getAskList());
 
+    if (partitionedAsks.getOpportunistic().isEmpty()) {
+      return Collections.emptyList();
+    }
+
     List<ContainerId> releasedContainers = request.getReleaseList();
     int numReleasedContainers = releasedContainers.size();
     if (numReleasedContainers > 0) {
@@ -236,8 +232,8 @@ public class OpportunisticContainerAllocator {
         appContext.getOutstandingOpReqs().descendingKeySet()) {
       // Allocated containers :
       //  Key = Requested Capability,
-      //  Value = List of Containers of given Cap (The actual container size
-      //          might be different than what is requested.. which is why
+      //  Value = List of Containers of given cap (the actual container size
+      //          might be different than what is requested, which is why
       //          we need the requested capability (key) to match against
       //          the outstanding reqs)
       Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
@@ -290,6 +286,10 @@ public class OpportunisticContainerAllocator {
       }
       nodesForScheduling.add(nodeEntry.getValue());
     }
+    if (nodesForScheduling.isEmpty()) {
+      LOG.warn("No nodes available for allocating opportunistic containers.");
+      return;
+    }
     int numAllocated = 0;
     int nextNodeToSchedule = 0;
     for (int numCont = 0; numCont < toAllocate; numCont++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 1b701ea..6fcddf8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -28,9 +30,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -56,15 +60,13 @@ public class OpportunisticContainerContext {
   private ContainerIdGenerator containerIdGenerator =
       new ContainerIdGenerator();
 
-  private Map<String, NodeId> nodeMap = new LinkedHashMap<>();
+  private volatile List<NodeId> nodeList = new LinkedList<>();
+  private final Map<String, NodeId> nodeMap = new LinkedHashMap<>();
 
-  // Mapping of NodeId to NodeTokens. Populated either from RM response or
-  // generated locally if required.
-  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
   private final Set<String> blacklist = new HashSet<>();
 
   // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by 
Priority,
-  // Resource Name (Host/rack/any) and capability. This mapping is required
+  // Resource Name (host/rack/any) and capability. This mapping is required
   // to match a received Container to an outstanding OPPORTUNISTIC
   // ResourceRequest (ask).
   private final TreeMap<Priority, Map<Resource, ResourceRequest>>
@@ -74,7 +76,7 @@ public class OpportunisticContainerContext {
     return containersAllocated;
   }
 
-  public OpportunisticContainerAllocator.AllocationParams getAppParams() {
+  public AllocationParams getAppParams() {
     return appParams;
   }
 
@@ -88,11 +90,29 @@ public class OpportunisticContainerContext {
   }
 
   public Map<String, NodeId> getNodeMap() {
-    return nodeMap;
+    return Collections.unmodifiableMap(nodeMap);
   }
 
-  public Map<NodeId, NMToken> getNodeTokens() {
-    return nodeTokens;
+  public synchronized void updateNodeList(List<NodeId> newNodeList) {
+    // This is an optimization for centralized placement. The
+    // OppContainerAllocatorAMService has a cached list of nodes which it sets
+    // here. The nodeMap needs to be updated only if the backing node list is
+    // modified.
+    if (newNodeList != nodeList) {
+      nodeList = newNodeList;
+      nodeMap.clear();
+      for (NodeId n : nodeList) {
+        nodeMap.put(n.getHost(), n);
+      }
+    }
+  }
+
+  public void updateAllocationParams(Resource minResource, Resource 
maxResource,
+      Resource incrResource, int containerTokenExpiryInterval) {
+    appParams.setMinResource(minResource);
+    appParams.setMaxResource(maxResource);
+    appParams.setIncrementResource(incrResource);
+    appParams.setContainerTokenExpiryInterval(containerTokenExpiryInterval);
   }
 
   public Set<String> getBlacklist() {
@@ -104,6 +124,15 @@ public class OpportunisticContainerContext {
     return outstandingOpReqs;
   }
 
+  public void updateCompletedContainers(AllocateResponse allocateResponse) {
+    for (ContainerStatus cs :
+        allocateResponse.getCompletedContainersStatuses()) {
+      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        containersAllocated.remove(cs.getContainerId());
+      }
+    }
+  }
+
   /**
    * Takes a list of ResourceRequests (asks), extracts the key information viz.
    * (Priority, ResourceName, Capability) and adds to the outstanding

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 7f13334..37f67c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -336,8 +336,7 @@ public class NodeManager extends CompositeService
     addService(nodeHealthChecker);
 
     boolean isDistSchedulingEnabled =
-        conf.getBoolean(YarnConfiguration.
-            OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+        conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
             YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
 
     this.context = createNMContext(containerTokenSecretManager,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index efbdfb4..22fc8f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -152,7 +152,7 @@ public final class DefaultRequestInterceptor extends
       return ((DistributedSchedulingAMProtocol)rmClient)
           .registerApplicationMasterForDistributedScheduling(request);
     } else {
-      throw new YarnException("Distributed Scheduling is not enabled !!");
+      throw new YarnException("Distributed Scheduling is not enabled.");
     }
   }
 
@@ -174,7 +174,7 @@ public final class DefaultRequestInterceptor extends
       }
       return allocateResponse;
     } else {
-      throw new YarnException("Distributed Scheduling is not enabled !!");
+      throw new YarnException("Distributed Scheduling is not enabled.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index 368858c..8a40337 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
@@ -32,8 +33,6 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -48,7 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
@@ -74,6 +75,9 @@ public final class DistributedScheduler extends 
AbstractRequestInterceptor {
   private OpportunisticContainerContext oppContainerContext =
       new OpportunisticContainerContext();
 
+  // Mapping of NodeId to NodeTokens. Populated either from RM response or
+  // generated locally if required.
+  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
   private ApplicationAttemptId applicationAttemptId;
   private OpportunisticContainerAllocator containerAllocator;
   private NMTokenSecretManagerInNM nmSecretManager;
@@ -157,17 +161,17 @@ public final class DistributedScheduler extends 
AbstractRequestInterceptor {
   }
 
   /**
-   * Check if we already have a NMToken. if Not, generate the Token and
-   * add it to the response
+   * Adds all the newly allocated Containers to the allocate Response.
+   * Additionally, in case the NMToken for one of the nodes does not exist, it
+   * generates one and adds it to the response.
    */
-  private void updateResponseWithNMTokens(AllocateResponse response,
+  private void updateAllocateResponse(AllocateResponse response,
       List<NMToken> nmTokens, List<Container> allocatedContainers) {
     List<NMToken> newTokens = new ArrayList<>();
     if (allocatedContainers.size() > 0) {
       response.getAllocatedContainers().addAll(allocatedContainers);
       for (Container alloc : allocatedContainers) {
-        if (!oppContainerContext.getNodeTokens().containsKey(
-            alloc.getNodeId())) {
+        if (!nodeTokens.containsKey(alloc.getNodeId())) {
           newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
         }
       }
@@ -179,17 +183,14 @@ public final class DistributedScheduler extends 
AbstractRequestInterceptor {
 
   private void updateParameters(
       RegisterDistributedSchedulingAMResponse registerResponse) {
-    oppContainerContext.getAppParams().setMinResource(
-        registerResponse.getMinContainerResource());
-    oppContainerContext.getAppParams().setMaxResource(
-        registerResponse.getMaxContainerResource());
-    oppContainerContext.getAppParams().setIncrementResource(
-        registerResponse.getIncrContainerResource());
-    if (oppContainerContext.getAppParams().getIncrementResource() == null) {
-      oppContainerContext.getAppParams().setIncrementResource(
-          oppContainerContext.getAppParams().getMinResource());
+    Resource incrementResource = registerResponse.getIncrContainerResource();
+    if (incrementResource == null) {
+      incrementResource = registerResponse.getMinContainerResource();
     }
-    oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
+    oppContainerContext.updateAllocationParams(
+        registerResponse.getMinContainerResource(),
+        registerResponse.getMaxContainerResource(),
+        incrementResource,
         registerResponse.getContainerTokenExpiryInterval());
 
     oppContainerContext.getContainerIdGenerator()
@@ -198,14 +199,7 @@ public final class DistributedScheduler extends 
AbstractRequestInterceptor {
   }
 
   private void setNodeList(List<NodeId> nodeList) {
-    oppContainerContext.getNodeMap().clear();
-    addToNodeList(nodeList);
-  }
-
-  private void addToNodeList(List<NodeId> nodes) {
-    for (NodeId n : nodes) {
-      oppContainerContext.getNodeMap().put(n.getHost(), n);
-    }
+    oppContainerContext.updateNodeList(nodeList);
   }
 
   @Override
@@ -243,23 +237,14 @@ public final class DistributedScheduler extends 
AbstractRequestInterceptor {
     setNodeList(dsResp.getNodesForScheduling());
     List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
     for (NMToken nmToken : nmTokens) {
-      oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
+      nodeTokens.put(nmToken.getNodeId(), nmToken);
     }
 
-    List<ContainerStatus> completedContainers =
-        dsResp.getAllocateResponse().getCompletedContainersStatuses();
-
-    // Only account for opportunistic containers
-    for (ContainerStatus cs : completedContainers) {
-      if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-        oppContainerContext.getContainersAllocated()
-            .remove(cs.getContainerId());
-      }
-    }
+    
oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
 
     // Check if we have NM tokens for all the allocated containers. If not
     // generate one and update the response.
-    updateResponseWithNMTokens(
+    updateAllocateResponse(
         dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index a473b14..a7c0a50 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
 import 
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
 
@@ -65,12 +67,14 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The OpportunisticContainerAllocatorAMService is started instead of the
@@ -88,17 +92,20 @@ public class OpportunisticContainerAllocatorAMService
       LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
 
   private final NodeQueueLoadMonitor nodeMonitor;
+  private final OpportunisticContainerAllocator oppContainerAllocator;
 
-  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
-      new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
-      new ConcurrentHashMap<>();
   private final int k;
 
+  private final long cacheRefreshInterval;
+  private List<NodeId> cachedNodeIds;
+  private long lastCacheUpdateTime;
+
   public OpportunisticContainerAllocatorAMService(RMContext rmContext,
       YarnScheduler scheduler) {
     super(OpportunisticContainerAllocatorAMService.class.getName(),
         rmContext, scheduler);
+    this.oppContainerAllocator = new OpportunisticContainerAllocator(
+        rmContext.getContainerTokenSecretManager(), 0);
     this.k = rmContext.getYarnConfiguration().getInt(
         YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
         YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
@@ -106,6 +113,8 @@ public class OpportunisticContainerAllocatorAMService
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
         YarnConfiguration.
             NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+    this.cacheRefreshInterval = nodeSortInterval;
+    this.lastCacheUpdateTime = System.currentTimeMillis();
     NodeQueueLoadMonitor.LoadComparator comparator =
         NodeQueueLoadMonitor.LoadComparator.valueOf(
             rmContext.getYarnConfiguration().get(
@@ -172,6 +181,27 @@ public class OpportunisticContainerAllocatorAMService
   public RegisterApplicationMasterResponse registerApplicationMaster
       (RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
+    final ApplicationAttemptId appAttemptId = getAppAttemptId();
+    SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
+        rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+    if (appAttempt.getOpportunisticContainerContext() == null) {
+      OpportunisticContainerContext opCtx = new 
OpportunisticContainerContext();
+      opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
+          .ContainerIdGenerator() {
+        @Override
+        public long generateContainerId() {
+          return appAttempt.getAppSchedulingInfo().getNewContainerId();
+        }
+      });
+      int tokenExpiryInterval = getConfig()
+          .getInt(YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
+              YarnConfiguration.
+                  OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT);
+      opCtx.updateAllocationParams(createMinContainerResource(),
+          createMaxContainerResource(), createIncrContainerResource(),
+          tokenExpiryInterval);
+      appAttempt.setOpportunisticContainerContext(opCtx);
+    }
     return super.registerApplicationMaster(request);
   }
 
@@ -185,7 +215,30 @@ public class OpportunisticContainerAllocatorAMService
   @Override
   public AllocateResponse allocate(AllocateRequest request) throws
       YarnException, IOException {
-    return super.allocate(request);
+
+    final ApplicationAttemptId appAttemptId = getAppAttemptId();
+    SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
+        rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+    OpportunisticContainerContext oppCtx =
+        appAttempt.getOpportunisticContainerContext();
+    oppCtx.updateNodeList(getLeastLoadedNodes());
+    List<Container> oppContainers =
+        oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
+        ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
+
+    if (!oppContainers.isEmpty()) {
+      handleNewContainers(oppContainers, false);
+      appAttempt.updateNMTokens(oppContainers);
+    }
+
+    // Allocate all guaranteed containers
+    AllocateResponse allocateResp = super.allocate(request);
+
+    oppCtx.updateCompletedContainers(allocateResp);
+
+    // Add all opportunistic containers
+    allocateResp.getAllocatedContainers().addAll(oppContainers);
+    return allocateResp;
   }
 
   @Override
@@ -198,39 +251,9 @@ public class OpportunisticContainerAllocatorAMService
     RegisterDistributedSchedulingAMResponse dsResp = recordFactory
         .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
     dsResp.setRegisterResponse(response);
-    dsResp.setMinContainerResource(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
-                YarnConfiguration.
-                    OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setMaxContainerResource(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
-                YarnConfiguration
-                    .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setIncrContainerResource(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
-                YarnConfiguration.
-                    OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
-                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
-        )
-    );
+    dsResp.setMinContainerResource(createMinContainerResource());
+    dsResp.setMaxContainerResource(createMaxContainerResource());
+    dsResp.setIncrContainerResource(createIncrContainerResource());
     dsResp.setContainerTokenExpiryInterval(
         getConfig().getInt(
             YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
@@ -240,8 +263,7 @@ public class OpportunisticContainerAllocatorAMService
         this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
 
     // Set nodes to be used for scheduling
-    dsResp.setNodesForScheduling(
-        this.nodeMonitor.selectLeastLoadedNodes(this.k));
+    dsResp.setNodesForScheduling(getLeastLoadedNodes());
     return dsResp;
   }
 
@@ -250,47 +272,30 @@ public class OpportunisticContainerAllocatorAMService
       DistributedSchedulingAllocateRequest request)
       throws YarnException, IOException {
     List<Container> distAllocContainers = request.getAllocatedContainers();
-    for (Container container : distAllocContainers) {
+    handleNewContainers(distAllocContainers, true);
+    AllocateResponse response = allocate(request.getAllocateRequest());
+    DistributedSchedulingAllocateResponse dsResp = recordFactory
+        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(getLeastLoadedNodes());
+    return dsResp;
+  }
+
+  private void handleNewContainers(List<Container> allocContainers,
+                                   boolean isRemotelyAllocated) {
+    for (Container container : allocContainers) {
       // Create RMContainer
       SchedulerApplicationAttempt appAttempt =
           ((AbstractYarnScheduler) rmContext.getScheduler())
               .getCurrentAttemptForContainer(container.getId());
       RMContainer rmContainer = new RMContainerImpl(container,
           appAttempt.getApplicationAttemptId(), container.getNodeId(),
-          appAttempt.getUser(), rmContext, true);
+          appAttempt.getUser(), rmContext, isRemotelyAllocated);
       appAttempt.addRMContainer(container.getId(), rmContainer);
       rmContainer.handle(
           new RMContainerEvent(container.getId(),
               RMContainerEventType.LAUNCHED));
     }
-    AllocateResponse response = allocate(request.getAllocateRequest());
-    DistributedSchedulingAllocateResponse dsResp = recordFactory
-        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
-    dsResp.setAllocateResponse(response);
-    dsResp.setNodesForScheduling(
-        this.nodeMonitor.selectLeastLoadedNodes(this.k));
-    return dsResp;
-  }
-
-  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
-                            String rackName, NodeId nodeId) {
-    if (rackName != null) {
-      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
-      Set<NodeId> nodeIds = mapping.get(rackName);
-      synchronized (nodeIds) {
-        nodeIds.add(nodeId);
-      }
-    }
-  }
-
-  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> 
mapping,
-                                 String rackName, NodeId nodeId) {
-    if (rackName != null) {
-      Set<NodeId> nodeIds = mapping.get(rackName);
-      synchronized (nodeIds) {
-        nodeIds.remove(nodeId);
-      }
-    }
   }
 
   @Override
@@ -303,10 +308,6 @@ public class OpportunisticContainerAllocatorAMService
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
       nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
           nodeAddedEvent.getAddedRMNode());
-      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
-          nodeAddedEvent.getAddedRMNode().getNodeID());
-      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
-          nodeAddedEvent.getAddedRMNode().getNodeID());
       break;
     case NODE_REMOVED:
       if (!(event instanceof NodeRemovedSchedulerEvent)) {
@@ -315,12 +316,6 @@ public class OpportunisticContainerAllocatorAMService
       NodeRemovedSchedulerEvent nodeRemovedEvent =
           (NodeRemovedSchedulerEvent) event;
       nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
-      removeFromMapping(rackToNode,
-          nodeRemovedEvent.getRemovedRMNode().getRackName(),
-          nodeRemovedEvent.getRemovedRMNode().getNodeID());
-      removeFromMapping(hostToNode,
-          nodeRemovedEvent.getRemovedRMNode().getHostName(),
-          nodeRemovedEvent.getRemovedRMNode().getNodeID());
       break;
     case NODE_UPDATE:
       if (!(event instanceof NodeUpdateSchedulerEvent)) {
@@ -364,4 +359,58 @@ public class OpportunisticContainerAllocatorAMService
   public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
     return nodeMonitor.getThresholdCalculator();
   }
+
+  private Resource createIncrContainerResource() {
+    return Resource.newInstance(
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
+            YarnConfiguration.
+                OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
+    );
+  }
+
+  private synchronized List<NodeId> getLeastLoadedNodes() {
+    long currTime = System.currentTimeMillis();
+    if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
+        || cachedNodeIds == null) {
+      cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k);
+      lastCacheUpdateTime = currTime;
+    }
+    return cachedNodeIds;
+  }
+
+  private Resource createMaxContainerResource() {
+    return Resource.newInstance(
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
+            YarnConfiguration
+                .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
+    );
+  }
+
+  private Resource createMinContainerResource() {
+    return Resource.newInstance(
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
+            YarnConfiguration.
+                OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
+        getConfig().getInt(
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
+    );
+  }
+
+  private static ApplicationAttemptId getAppAttemptId() throws YarnException {
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
+    return applicationAttemptId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 5e9bece..d2d706d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1184,6 +1184,13 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     Configuration config = this.rmContext.getYarnConfiguration();
     if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
         || YarnConfiguration.isDistSchedulingEnabled(config)) {
+      if (YarnConfiguration.isDistSchedulingEnabled(config) &&
+          !YarnConfiguration
+              .isOpportunisticContainerAllocationEnabled(config)) {
+        throw new YarnRuntimeException(
+            "Invalid parameters: opportunistic container allocation has to " +
+                "be enabled when distributed scheduling is enabled.");
+      }
       OpportunisticContainerAllocatorAMService
           oppContainerAllocatingAMService =
           new OpportunisticContainerAllocatorAMService(this.rmContext,
@@ -1193,9 +1200,8 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
               OpportunisticContainerAllocatorAMService.class.getName());
       // Add an event dispatcher for the
       // OpportunisticContainerAllocatorAMService to handle node
-      // updates/additions and removals.
-      // Since the SchedulerEvent is currently a super set of theses,
-      // we register interest for it..
+      // additions, updates and removals. Since the SchedulerEvent is currently
+      // a super set of theses, we register interest for it.
       addService(oppContainerAllocEventDispatcher);
       rmDispatcher.register(SchedulerEventType.class,
           oppContainerAllocEventDispatcher);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index adc3a97..9675fac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -44,6 +44,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -68,6 +69,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpda
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -114,6 +117,9 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   private boolean isAttemptRecovering;
 
   protected ResourceUsage attemptResourceUsage = new ResourceUsage();
+  /** Resource usage of opportunistic containers. */
+  protected ResourceUsage attemptOpportunisticResourceUsage =
+      new ResourceUsage();
   /** Scheduled by a remote scheduler. */
   protected ResourceUsage attemptResourceUsageAllocatedRemotely =
       new ResourceUsage();
@@ -132,6 +138,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   // by NM should not be recovered.
   private Set<ContainerId> pendingRelease = null;
 
+  private OpportunisticContainerContext oppContainerContext;
+
   /**
    * Count how many times the application has been given an opportunity to
    * schedule a task at each priority. Each time the scheduler asks the
@@ -199,7 +207,17 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     readLock = lock.readLock();
     writeLock = lock.writeLock();
   }
-  
+
+  public void setOpportunisticContainerContext(
+      OpportunisticContainerContext oppContext) {
+    this.oppContainerContext = oppContext;
+  }
+
+  public OpportunisticContainerContext
+      getOpportunisticContainerContext() {
+    return this.oppContainerContext;
+  }
+
   /**
    * Get the live containers of the application.
    * @return live containers of the application
@@ -331,6 +349,10 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     try {
       writeLock.lock();
       liveContainers.put(id, rmContainer);
+      if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+        this.attemptOpportunisticResourceUsage.incUsed(
+            rmContainer.getAllocatedResource());
+      }
       if (rmContainer.isRemotelyAllocated()) {
         this.attemptResourceUsageAllocatedRemotely.incUsed(
             rmContainer.getAllocatedResource());
@@ -344,9 +366,15 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     try {
       writeLock.lock();
       RMContainer rmContainer = liveContainers.remove(containerId);
-      if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
-        this.attemptResourceUsageAllocatedRemotely.decUsed(
-            rmContainer.getAllocatedResource());
+      if (rmContainer != null) {
+        if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+          this.attemptOpportunisticResourceUsage
+              .decUsed(rmContainer.getAllocatedResource());
+        }
+        if (rmContainer.isRemotelyAllocated()) {
+          this.attemptResourceUsageAllocatedRemotely
+              .decUsed(rmContainer.getAllocatedResource());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -612,12 +640,7 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
               container.getPriority(), rmContainer.getCreationTime(),
               this.logAggregationContext, rmContainer.getNodeLabelExpression(),
               containerType));
-      NMToken nmToken =
-          rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
-              getApplicationAttemptId(), container);
-      if (nmToken != null) {
-        updatedNMTokens.add(nmToken);
-      }
+      updateNMToken(container);
     } catch (IllegalArgumentException e) {
       // DNS might be down, skip returning this container.
       LOG.error("Error trying to assign container token and NM token to"
@@ -635,6 +658,21 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     return container;
   }
 
+  public void updateNMTokens(Collection<Container> containers) {
+    for (Container container : containers) {
+      updateNMToken(container);
+    }
+  }
+
+  private void updateNMToken(Container container) {
+    NMToken nmToken =
+        rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
+            getApplicationAttemptId(), container);
+    if (nmToken != null) {
+      updatedNMTokens.add(nmToken);
+    }
+  }
+
   // Create container token and update NMToken altogether, if either of them 
fails for
   // some reason like DNS unavailable, do not return this container and keep it
   // in the newlyAllocatedContainers waiting to be refetched.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index 017a256..b80a17c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
@@ -103,16 +105,23 @@ public class NodeQueueLoadMonitor implements 
ClusterMonitor {
       new ConcurrentHashMap<>();
   private final LoadComparator comparator;
   private QueueLimitCalculator thresholdCalculator;
+  private ReentrantReadWriteLock sortedNodesLock = new 
ReentrantReadWriteLock();
+  private ReentrantReadWriteLock clusterNodesLock =
+      new ReentrantReadWriteLock();
 
   Runnable computeTask = new Runnable() {
     @Override
     public void run() {
-      synchronized (sortedNodes) {
+      ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
+      writeLock.lock();
+      try {
         sortedNodes.clear();
         sortedNodes.addAll(sortNodes());
         if (thresholdCalculator != null) {
           thresholdCalculator.update();
         }
+      } finally {
+        writeLock.unlock();
       }
     }
   };
@@ -166,9 +175,16 @@ public class NodeQueueLoadMonitor implements 
ClusterMonitor {
   @Override
   public void removeNode(RMNode removedRMNode) {
     LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
-    synchronized (this.clusterNodes) {
-      if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
-        this.clusterNodes.remove(removedRMNode.getNodeID());
+    ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
+    writeLock.lock();
+    ClusterNode node;
+    try {
+      node = this.clusterNodes.remove(removedRMNode.getNodeID());
+    } finally {
+      writeLock.unlock();
+    }
+    if (LOG.isDebugEnabled()) {
+      if (node != null) {
         LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
       } else {
         LOG.debug("Node not in list!");
@@ -186,7 +202,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor 
{
     int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
     // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
     // UNLESS comparator is based on queue length.
-    synchronized (this.clusterNodes) {
+    ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
+    writeLock.lock();
+    try {
       ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
       if (currentNode == null) {
         if (estimatedQueueWaitTime != -1
@@ -222,6 +240,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor 
{
               "wait queue length [" + currentNode.queueLength + "]");
         }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -245,15 +265,22 @@ public class NodeQueueLoadMonitor implements 
ClusterMonitor {
    * @return ordered list of nodes
    */
   public List<NodeId> selectLeastLoadedNodes(int k) {
-    synchronized (this.sortedNodes) {
-      return ((k < this.sortedNodes.size()) && (k >= 0)) ?
+    ReentrantReadWriteLock.ReadLock readLock = sortedNodesLock.readLock();
+    readLock.lock();
+    try {
+      List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
           new ArrayList<>(this.sortedNodes).subList(0, k) :
           new ArrayList<>(this.sortedNodes);
+      return Collections.unmodifiableList(retVal);
+    } finally {
+      readLock.unlock();
     }
   }
 
   private List<NodeId> sortNodes() {
-    synchronized (this.clusterNodes) {
+    ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
+    readLock.lock();
+    try {
       ArrayList aList = new ArrayList<>(this.clusterNodes.values());
       List<NodeId> retList = new ArrayList<>();
       Object[] nodes = aList.toArray();
@@ -267,6 +294,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor 
{
         retList.add(((ClusterNode)nodes[j]).nodeId);
       }
       return retList;
+    } finally {
+      readLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10be4598/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 07c6b54..207f5ba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -62,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSche
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -97,6 +98,11 @@ public class TestOpportunisticContainerAllocatorAMService {
       public Configuration getYarnConfiguration() {
         return new YarnConfiguration();
       }
+
+      @Override
+      public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+        return new RMContainerTokenSecretManager(conf);
+      }
     };
     Container c = factory.newRecordInstance(Container.class);
     c.setExecutionType(ExecutionType.OPPORTUNISTIC);
@@ -117,8 +123,8 @@ public class TestOpportunisticContainerAllocatorAMService {
     Server server = service.getServer(rpc, conf, addr, null);
     server.start();
 
-    // Verify that the DistrubutedSchedulingService can handle vanilla
-    // ApplicationMasterProtocol clients
+    // Verify that the OpportunisticContainerAllocatorAMSercvice can handle
+    // vanilla ApplicationMasterProtocol clients
     RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
         ProtobufRpcEngine.class);
     ApplicationMasterProtocolPB ampProxy =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to