YARN-5823. Update NMTokens in case of requests with only opportunistic 
containers. (Konstantinos Karanasos via asuresh)

(cherry picked from commit 283fa33febe043bd7b4fa87546be26c9c5a8f8b5)
(cherry picked from commit 1c4cc88a754ac9f557cdc8c859b8aadec19a5067)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/047772f1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/047772f1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/047772f1

Branch: refs/heads/branch-2
Commit: 047772f15f1ba22d41d323639a10fb4b9477a981
Parents: 0e60c7c
Author: Arun Suresh <asur...@apache.org>
Authored: Wed Nov 9 00:11:25 2016 -0800
Committer: Arun Suresh <asur...@apache.org>
Committed: Fri Jan 6 11:15:09 2017 -0800

----------------------------------------------------------------------
 .../TestOpportunisticContainerAllocation.java   | 71 +++++++++++++++++++-
 .../OpportunisticContainerAllocator.java        | 55 ++++++++-------
 .../containermanager/ContainerManagerImpl.java  |  2 +-
 .../scheduler/DistributedScheduler.java         | 19 ++++--
 .../ApplicationMasterService.java               |  3 +-
 ...pportunisticContainerAllocatorAMService.java | 23 ++++++-
 6 files changed, 137 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/047772f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index b9b4b02..ace145d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -229,6 +229,9 @@ public class TestOpportunisticContainerAllocation {
 
       amClient.registerApplicationMaster("Host", 10000, "");
 
+      testOpportunisticAllocation(
+          (AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
+
       testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
 
       amClient
@@ -247,7 +250,6 @@ public class TestOpportunisticContainerAllocation {
       final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
       throws YarnException, IOException {
     // setup container request
-
     assertEquals(0, amClient.ask.size());
     assertEquals(0, amClient.release.size());
 
@@ -388,6 +390,73 @@ public class TestOpportunisticContainerAllocation {
     assertEquals(0, amClient.release.size());
   }
 
+  /**
+   * Tests allocation with requests comprising only opportunistic containers.
+   */
+  private void testOpportunisticAllocation(
+      final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
+      throws YarnException, IOException {
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+            true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(2, oppContainersRequestedAny);
+
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 10;
+    Set<ContainerId> releases = new TreeSet<>();
+
+    amClient.getNMTokenCache().clearCache();
+    Assert.assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+    HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+    while (allocatedContainerCount < oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        allocatedContainerCount++;
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+      }
+
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        receivedNMTokens.put(nodeID, token.getToken());
+      }
+
+      if (allocatedContainerCount < oppContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(1, receivedNMTokens.values().size());
+  }
+
   private void sleep(int sleepTime) {
     try {
       Thread.sleep(sleepTime);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047772f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 4410db1..16436bd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -157,12 +156,18 @@ public class OpportunisticContainerAllocator {
     }
   }
 
-  static class PartitionedResourceRequests {
+  /**
+   * Class that includes two lists of {@link ResourceRequest}s: one for
+   * GUARANTEED and one for OPPORTUNISTIC {@link ResourceRequest}s.
+   */
+  public static class PartitionedResourceRequests {
     private List<ResourceRequest> guaranteed = new ArrayList<>();
     private List<ResourceRequest> opportunistic = new ArrayList<>();
+
     public List<ResourceRequest> getGuaranteed() {
       return guaranteed;
     }
+
     public List<ResourceRequest> getOpportunistic() {
       return opportunistic;
     }
@@ -186,10 +191,10 @@ public class OpportunisticContainerAllocator {
   }
 
   /**
-   * Entry point into the Opportunistic Container Allocator.
+   * Allocate OPPORTUNISTIC containers.
    * @param request AllocateRequest
    * @param applicationAttemptId ApplicationAttemptId
-   * @param appContext App Specific OpportunisticContainerContext
+   * @param opportContext App specific OpportunisticContainerContext
    * @param rmIdentifier RM Identifier
    * @param appSubmitter App Submitter
    * @return List of Containers.
@@ -197,37 +202,31 @@ public class OpportunisticContainerAllocator {
    */
   public List<Container> allocateContainers(
       AllocateRequest request, ApplicationAttemptId applicationAttemptId,
-      OpportunisticContainerContext appContext, long rmIdentifier,
+      OpportunisticContainerContext opportContext, long rmIdentifier,
       String appSubmitter) throws YarnException {
-    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
-    PartitionedResourceRequests partitionedAsks =
-        partitionAskList(request.getAskList());
-
-    if (partitionedAsks.getOpportunistic().isEmpty()) {
-      return Collections.emptyList();
-    }
-
+    // Update released containers.
     List<ContainerId> releasedContainers = request.getReleaseList();
     int numReleasedContainers = releasedContainers.size();
     if (numReleasedContainers > 0) {
       LOG.info("AttemptID: " + applicationAttemptId + " released: "
           + numReleasedContainers);
-      appContext.getContainersAllocated().removeAll(releasedContainers);
+      opportContext.getContainersAllocated().removeAll(releasedContainers);
     }
 
-    // Also, update black list
+    // Update black list.
     ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
     if (rbr != null) {
-      appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
-      appContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
+      opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
+      opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
     }
 
-    // Add OPPORTUNISTIC reqs to the outstanding reqs
-    appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic());
+    // Add OPPORTUNISTIC requests to the outstanding ones.
+    opportContext.addToOutstandingReqs(request.getAskList());
 
+    // Satisfy the outstanding OPPORTUNISTIC requests.
     List<Container> allocatedContainers = new ArrayList<>();
     for (Priority priority :
-        appContext.getOutstandingOpReqs().descendingKeySet()) {
+        opportContext.getOutstandingOpReqs().descendingKeySet()) {
       // Allocated containers :
       //  Key = Requested Capability,
       //  Value = List of Containers of given cap (the actual container size
@@ -235,16 +234,14 @@ public class OpportunisticContainerAllocator {
       //          we need the requested capability (key) to match against
       //          the outstanding reqs)
       Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
-          appContext, priority, applicationAttemptId, appSubmitter);
+          opportContext, priority, applicationAttemptId, appSubmitter);
       for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
-        appContext.matchAllocationToOutstandingRequest(
+        opportContext.matchAllocationToOutstandingRequest(
             e.getKey(), e.getValue());
         allocatedContainers.addAll(e.getValue());
       }
     }
 
-    // Send all the GUARANTEED Reqs to RM
-    request.setAskList(partitionedAsks.getGuaranteed());
     return allocatedContainers;
   }
 
@@ -359,8 +356,14 @@ public class OpportunisticContainerAllocator {
     return containerToken;
   }
 
-  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
-      askList) {
+  /**
+   * Partitions a list of ResourceRequest to two separate lists, one for
+   * GUARANTEED and one for OPPORTUNISTIC ResourceRequests.
+   * @param askList the list of ResourceRequests to be partitioned
+   * @return the partitioned ResourceRequests
+   */
+  public PartitionedResourceRequests partitionAskList(
+      List<ResourceRequest> askList) {
     PartitionedResourceRequests partitionedRequests =
         new PartitionedResourceRequests();
     for (ResourceRequest rr : askList) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047772f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index d350abd..7c7cb81 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -1446,7 +1446,7 @@ public class ContainerManagerImpl extends 
CompositeService implements
 
   @Override
   public OpportunisticContainersStatus getOpportunisticContainersStatus() {
-    return null;
+    return OpportunisticContainersStatus.newInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047772f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index a12d16a..0f47c93 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -220,16 +220,27 @@ public final class DistributedScheduler extends 
AbstractRequestInterceptor {
   public DistributedSchedulingAllocateResponse 
allocateForDistributedScheduling(
       DistributedSchedulingAllocateRequest request)
       throws YarnException, IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Forwarding allocate request to the" +
-          "Distributed Scheduler Service on YARN RM");
-    }
+
+    // Partition requests to GUARANTEED and OPPORTUNISTIC.
+    OpportunisticContainerAllocator.PartitionedResourceRequests
+        partitionedAsks = containerAllocator
+        .partitionAskList(request.getAllocateRequest().getAskList());
+
+    // Allocate OPPORTUNISTIC containers.
+    
request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
     List<Container> allocatedContainers =
         containerAllocator.allocateContainers(
             request.getAllocateRequest(), applicationAttemptId,
             oppContainerContext, rmIdentifier, appSubmitter);
 
+    // Prepare request for sending to RM for scheduling GUARANTEED containers.
     request.setAllocatedContainers(allocatedContainers);
+    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocate request to the" +
+          "Distributed Scheduler Service on YARN RM");
+    }
 
     DistributedSchedulingAllocateResponse dsResp =
         getNextInterceptor().allocateForDistributedScheduling(request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047772f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 108c327..b451f68 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -537,7 +537,8 @@ public class ApplicationMasterService extends 
AbstractService implements
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
       AllocateResponse allocateResponse =
           recordFactory.newRecordInstance(AllocateResponse.class);
-      if (!allocation.getContainers().isEmpty()) {
+      if (allocation.getNMTokens() != null &&
+          !allocation.getNMTokens().isEmpty()) {
         allocateResponse.setNMTokens(allocation.getNMTokens());
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047772f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 19997e4..bf605ec 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -220,34 +220,51 @@ public class OpportunisticContainerAllocatorAMService
   public AllocateResponse allocate(AllocateRequest request) throws
       YarnException, IOException {
 
+    // Partition requests to GUARANTEED and OPPORTUNISTIC.
+    OpportunisticContainerAllocator.PartitionedResourceRequests
+        partitionedAsks =
+        oppContainerAllocator.partitionAskList(request.getAskList());
+
+    // Allocate OPPORTUNISTIC containers.
+    request.setAskList(partitionedAsks.getOpportunistic());
     final ApplicationAttemptId appAttemptId = getAppAttemptId();
     SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
         rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+
     OpportunisticContainerContext oppCtx =
         appAttempt.getOpportunisticContainerContext();
     oppCtx.updateNodeList(getLeastLoadedNodes());
+
     List<Container> oppContainers =
         oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
         ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
 
+    // Create RMContainers and update the NMTokens.
     if (!oppContainers.isEmpty()) {
       handleNewContainers(oppContainers, false);
       appAttempt.updateNMTokens(oppContainers);
     }
 
-    // Allocate all guaranteed containers
+    // Allocate GUARANTEED containers.
+    request.setAskList(partitionedAsks.getGuaranteed());
     AllocateResponse allocateResp = super.allocate(request);
 
+    // Add allocated OPPORTUNISTIC containers to the AllocateResponse.
+    if (!oppContainers.isEmpty()) {
+      allocateResp.getAllocatedContainers().addAll(oppContainers);
+    }
+
+    // Update opportunistic container context with the allocated GUARANTEED
+    // containers.
     oppCtx.updateCompletedContainers(allocateResp);
 
     // Add all opportunistic containers
-    allocateResp.getAllocatedContainers().addAll(oppContainers);
     return allocateResp;
   }
 
   @Override
   public RegisterDistributedSchedulingAMResponse
-  registerApplicationMasterForDistributedScheduling(
+      registerApplicationMasterForDistributedScheduling(
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
     RegisterApplicationMasterResponse response =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to