YARN-4597. Introduce ContainerScheduler and a SCHEDULED state to NodeManager 
container lifecycle. (asuresh)

(cherry picked from commit 3219b7b4ac7d12aee343f6ab2980b3357fc618b6)
(cherry picked from commit 49921678858a17ca58ca2ae704ced1ff584e0859)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/368565f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/368565f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/368565f0

Branch: refs/heads/branch-2
Commit: 368565f0ce229c573665d38d95526c10fd5e4108
Parents: 047772f
Author: Arun Suresh <asur...@apache.org>
Authored: Tue Nov 15 07:48:55 2016 -0800
Committer: Arun Suresh <asur...@apache.org>
Committed: Fri Jan 6 11:15:09 2017 -0800

----------------------------------------------------------------------
 .../hadoop/mapred/TestMROpportunisticMaps.java  |  18 +-
 .../yarn/api/records/ContainerExitStatus.java   |   6 +
 .../hadoop/yarn/api/records/ContainerState.java |   4 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  17 +-
 .../src/main/proto/yarn_protos.proto            |   2 +-
 .../api/impl/TestDistributedScheduling.java     |   3 +-
 .../yarn/client/api/impl/TestNMClient.java      |   9 +-
 .../TestOpportunisticContainerAllocation.java   |   2 +
 .../src/main/resources/yarn-default.xml         |   6 +-
 .../hadoop/yarn/server/nodemanager/Context.java |  18 +-
 .../yarn/server/nodemanager/NodeManager.java    |  42 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  53 --
 .../containermanager/ContainerManager.java      |   4 +
 .../containermanager/ContainerManagerImpl.java  |  32 +-
 .../containermanager/container/Container.java   |   6 +
 .../container/ContainerImpl.java                |  95 +-
 .../container/ContainerState.java               |   2 +-
 .../launcher/ContainerLaunch.java               |  30 +-
 .../launcher/RecoveredContainerLaunch.java      |   4 +-
 .../monitor/ContainersMonitor.java              |  54 +-
 .../monitor/ContainersMonitorImpl.java          |  79 +-
 .../queuing/QueuingContainerManagerImpl.java    | 647 --------------
 .../containermanager/queuing/package-info.java  |  23 -
 ...locationBasedResourceUtilizationTracker.java | 137 +++
 .../scheduler/ContainerScheduler.java           | 419 +++++++++
 .../scheduler/ContainerSchedulerEvent.java      |  51 ++
 .../scheduler/ContainerSchedulerEventType.java  |  29 +
 .../scheduler/ResourceUtilizationTracker.java   |  59 ++
 .../scheduler/package-info.java                 |  22 +
 .../nodemanager/metrics/NodeManagerMetrics.java |  35 +
 .../nodemanager/webapp/ContainerLogsUtils.java  |   5 +-
 .../yarn/server/nodemanager/TestEventFlow.java  |   3 +-
 .../nodemanager/TestNodeManagerResync.java      |   8 +
 .../nodemanager/TestNodeManagerShutdown.java    |   5 +-
 .../nodemanager/TestNodeStatusUpdater.java      | 123 ---
 .../amrmproxy/BaseAMRMProxyTest.java            |   6 +-
 .../BaseContainerManagerTest.java               |  24 +-
 .../TestContainerManagerRecovery.java           |  35 +-
 .../TestContainerManagerRegression.java         |  84 --
 .../container/TestContainer.java                |  26 +-
 .../queuing/TestQueuingContainerManager.java    | 596 -------------
 .../TestContainerSchedulerQueuing.java          | 872 +++++++++++++++++++
 .../nodemanager/webapp/MockContainer.java       |  15 +
 ...pportunisticContainerAllocatorAMService.java |   4 +-
 .../rmcontainer/RMContainerImpl.java            |   4 +-
 .../resourcemanager/rmnode/RMNodeImpl.java      |  32 +-
 .../scheduler/AbstractYarnScheduler.java        |   2 +
 .../scheduler/SchedulerNode.java                |   9 +-
 .../hadoop/yarn/server/MiniYARNCluster.java     |  55 +-
 49 files changed, 1996 insertions(+), 1820 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
index 021863b..d975fd0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
@@ -68,15 +68,6 @@ public class TestMROpportunisticMaps {
     doTest(4, 1, 1, 2);
   }
 
-  /**
-   * Test will run with 6 Maps and 2 Reducers. All the Maps are OPPORTUNISTIC.
-   * @throws Exception
-   */
-  @Test
-  public void testMultipleReducers() throws Exception {
-    doTest(6, 2, 1, 6);
-  }
-
   public void doTest(int numMappers, int numReducers, int numNodes,
       int percent) throws Exception {
     doTest(numMappers, numReducers, numNodes, 1000, percent);
@@ -94,7 +85,8 @@ public class TestMROpportunisticMaps {
       conf.setBoolean(YarnConfiguration.
           OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
       conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+      conf.setInt(
+          YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
       dfsCluster = new MiniDFSCluster.Builder(conf)
           .numDataNodes(numNodes).build();
       fileSystem = dfsCluster.getFileSystem();
@@ -104,11 +96,7 @@ public class TestMROpportunisticMaps {
       createInput(fileSystem, numMappers, numLines);
       // Run the test.
 
-      Configuration jobConf = mrCluster.getConfig();
-      jobConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-
-      runMergeTest(new JobConf(jobConf), fileSystem,
+      runMergeTest(new JobConf(conf), fileSystem,
           numMappers, numReducers, numLines, percent);
     } finally {
       if (dfsCluster != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
index f88fa3b..0207010 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
@@ -72,4 +72,10 @@ public class ContainerExitStatus {
    */
   public static final int KILLED_AFTER_APP_COMPLETION = -107;
 
+  /**
+   * Container was terminated by the ContainerScheduler to make room
+   * for another container...
+   */
+  public static final int KILLED_BY_CONTAINER_SCHEDULER = -108;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 582389f..4efd8c1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -36,6 +36,6 @@ public enum ContainerState {
   /** Completed container */
   COMPLETE,
 
-  /** Queued at the NM. */
-  QUEUED
+  /** Scheduled (awaiting resources) at the NM. */
+  SCHEDULED
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index efa8b9d..b97c434 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -376,12 +376,16 @@ public class YarnConfiguration extends Configuration {
   public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT =
       1.0f;
 
-  /** Min length of container queue at NodeManager. */
+  /** Min length of container queue at NodeManager. This is a cluster-wide
+   * configuration that acts as the lower-bound of optimal queue length
+   * calculated by the NodeQueueLoadMonitor */
   public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
       YARN_PREFIX + "nm-container-queuing.min-queue-length";
   public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1;
 
-  /** Max length of container queue at NodeManager. */
+  /** Max length of container queue at NodeManager. This is a cluster-wide
+   * configuration that acts as the upper-bound of optimal queue length
+   * calculated by the NodeQueueLoadMonitor */
   public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
       YARN_PREFIX + "nm-container-queuing.max-queue-length";
   public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
@@ -802,10 +806,11 @@ public class YarnConfiguration extends Configuration {
   /** Prefix for all node manager configs.*/
   public static final String NM_PREFIX = "yarn.nodemanager.";
 
-  /** Enable Queuing of <code>OPPORTUNISTIC</code> containers. */
-  public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX
-      + "container-queuing-enabled";
-  public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false;
+  /** Max Queue length of <code>OPPORTUNISTIC</code> containers on the NM. */
+  public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
+      NM_PREFIX + "opportunistic-containers-max-queue-length";
+  public static final int NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT 
=
+      0;
 
   /** Environment variables that will be sent to containers.*/
   public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index c1bb07e..d2db5b4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -82,7 +82,7 @@ enum ContainerStateProto {
   C_NEW = 1;
   C_RUNNING = 2;
   C_COMPLETE = 3;
-  C_QUEUED = 4;
+  C_SCHEDULED = 4;
 }
 
 message ContainerProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 9dbdb22..407aaa2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -108,7 +108,8 @@ public class TestDistributedScheduling extends 
BaseAMRMProxyE2ETest {
     conf.setBoolean(YarnConfiguration.
         OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+    conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+        10);
     cluster.init(conf);
     cluster.start();
     yarnConf = cluster.getConfig();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 3640883..d211d6d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -330,6 +331,12 @@ public class TestNMClient {
           ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
       ContainerLaunchContext clc =
           Records.newRecord(ContainerLaunchContext.class);
+      if (Shell.WINDOWS) {
+        clc.setCommands(
+            Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul"));
+      } else {
+        clc.setCommands(Arrays.asList("sleep", "10"));
+      }
       clc.setTokens(securityTokens);
       try {
         nmClient.startContainer(container, clc);
@@ -415,7 +422,7 @@ public class TestNMClient {
     try {
       nmClient.increaseContainerResource(container);
     } catch (YarnException e) {
-      // NM container will only be in LOCALIZED state, so expect the increase
+      // NM container will only be in SCHEDULED state, so expect the increase
       // action to fail.
       if (!e.getMessage().contains(
           "can only be changed when a container is in RUNNING state")) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index ace145d..802c207 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -111,6 +111,8 @@ public class TestOpportunisticContainerAllocation {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setBoolean(
         YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
     yarnCluster =
         new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 79df3ba..6b01749 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -973,10 +973,10 @@
   </property>
 
   <property>
-    <description>Enable Queuing of OPPORTUNISTIC containers on the
+    <description>Max number of OPPORTUNISTIC containers to queue at the
       nodemanager.</description>
-    <name>yarn.nodemanager.container-queuing-enabled</name>
-    <value>false</value>
+    <name>yarn.nodemanager.opportunistic-containers-max-queue-length</name>
+    <value>0</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 4e8527e..6680be8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@@ -46,15 +46,6 @@ import 
org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 public interface Context {
 
   /**
-   * Interface exposing methods related to the queuing of containers in the NM.
-   */
-  interface QueuingContext {
-    ConcurrentMap<ContainerId, ContainerTokenIdentifier> getQueuedContainers();
-
-    ConcurrentMap<ContainerTokenIdentifier, String> 
getKilledQueuedContainers();
-  }
-
-  /**
    * Return the nodeId. Usable only when the ContainerManager is started.
    * 
    * @return the NodeId
@@ -104,13 +95,6 @@ public interface Context {
 
   NodeStatusUpdater getNodeStatusUpdater();
 
-  /**
-   * Returns a <code>QueuingContext</code> that provides information about the
-   * number of Containers Queued as well as the number of Containers that were
-   * queued and killed.
-   */
-  QueuingContext getQueuingContext();
-
   boolean isDistributedSchedulingEnabled();
 
   OpportunisticContainerAllocator getContainerAllocator();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 3910cd1..6ec3dff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -56,14 +56,12 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@@ -173,14 +171,8 @@ public class NodeManager extends CompositeService
       ContainerExecutor exec, DeletionService del,
       NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
       LocalDirsHandlerService dirsHandler) {
-    if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
-        YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
-      return new QueuingContainerManagerImpl(context, exec, del,
-          nodeStatusUpdater, metrics, dirsHandler);
-    } else {
-      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
-          metrics, dirsHandler);
-    }
+    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+        metrics, dirsHandler);
   }
 
   protected WebServer createWebServer(Context nmContext,
@@ -495,7 +487,6 @@ public class NodeManager extends CompositeService
 
     private OpportunisticContainerAllocator containerAllocator;
 
-    private final QueuingContext queuingContext;
     private ContainerExecutor executor;
 
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
@@ -513,7 +504,6 @@ public class NodeManager extends CompositeService
       this.stateStore = stateStore;
       this.logAggregationReportForApps = new ConcurrentLinkedQueue<
           LogAggregationReport>();
-      this.queuingContext = new QueuingNMContext();
       this.isDistSchedulingEnabled = isDistSchedulingEnabled;
       this.conf = conf;
     }
@@ -642,11 +632,6 @@ public class NodeManager extends CompositeService
       this.nodeStatusUpdater = nodeStatusUpdater;
     }
 
-    @Override
-    public QueuingContext getQueuingContext() {
-      return this.queuingContext;
-    }
-
     public boolean isDistributedSchedulingEnabled() {
       return isDistSchedulingEnabled;
     }
@@ -672,29 +657,6 @@ public class NodeManager extends CompositeService
   }
 
   /**
-   * Class that keeps the context for containers queued at the NM.
-   */
-  public static class QueuingNMContext implements Context.QueuingContext {
-    protected final ConcurrentMap<ContainerId, ContainerTokenIdentifier>
-        queuedContainers = new ConcurrentSkipListMap<>();
-
-    protected final ConcurrentMap<ContainerTokenIdentifier, String>
-        killedQueuedContainers = new ConcurrentHashMap<>();
-
-    @Override
-    public ConcurrentMap<ContainerId, ContainerTokenIdentifier>
-        getQueuedContainers() {
-      return this.queuedContainers;
-    }
-
-    @Override
-    public ConcurrentMap<ContainerTokenIdentifier, String>
-        getKilledQueuedContainers() {
-      return this.killedQueuedContainers;
-    }
-  }
-
-  /**
    * @return the node health checker
    */
   public NodeHealthCheckerService getNodeHealthChecker() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index ae2e8e4..7ec12cb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -62,7 +61,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -568,9 +566,6 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
       }
     }
 
-    // Account for all containers that got killed while they were still queued.
-    pendingCompletedContainers.putAll(getKilledQueuedContainerStatuses());
-
     containerStatuses.addAll(pendingCompletedContainers.values());
 
     if (LOG.isDebugEnabled()) {
@@ -580,43 +575,6 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     return containerStatuses;
   }
 
-  /**
-   * Add to the container statuses the status of the containers that got killed
-   * while they were queued.
-   */
-  private Map<ContainerId, ContainerStatus> getKilledQueuedContainerStatuses() 
{
-    Map<ContainerId, ContainerStatus> killedQueuedContainerStatuses =
-        new HashMap<>();
-    for (Map.Entry<ContainerTokenIdentifier, String> killedQueuedContainer :
-        this.context.getQueuingContext().
-            getKilledQueuedContainers().entrySet()) {
-      ContainerTokenIdentifier containerTokenId = killedQueuedContainer
-          .getKey();
-      ContainerId containerId = containerTokenId.getContainerID();
-      ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
-          containerId, ContainerState.COMPLETE,
-          killedQueuedContainer.getValue(), ContainerExitStatus.ABORTED,
-          containerTokenId.getResource(), containerTokenId.getExecutionType());
-      ApplicationId applicationId = containerId.getApplicationAttemptId()
-          .getApplicationId();
-      if (isApplicationStopped(applicationId)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(applicationId + " is completing, " + " remove "
-              + containerId + " from NM context.");
-        }
-        this.context.getQueuingContext().getKilledQueuedContainers()
-            .remove(containerTokenId);
-        killedQueuedContainerStatuses.put(containerId, containerStatus);
-      } else {
-        if (!isContainerRecentlyStopped(containerId)) {
-          killedQueuedContainerStatuses.put(containerId, containerStatus);
-        }
-      }
-      addCompletedContainer(containerId);
-    }
-    return killedQueuedContainerStatuses;
-  }
-
   private List<ApplicationId> getRunningApplications() {
     List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
     runningApplications.addAll(this.context.getApplications().keySet());
@@ -701,17 +659,6 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
       }
     }
 
-    // Remove null containers from queuing context for killed queued 
containers.
-    Iterator<ContainerTokenIdentifier> killedQueuedContIter =
-        context.getQueuingContext().getKilledQueuedContainers().keySet().
-            iterator();
-    while (killedQueuedContIter.hasNext()) {
-      if (removedNullContainers.contains(
-          killedQueuedContIter.next().getContainerID())) {
-        killedQueuedContIter.remove();
-      }
-    }
-
     if (!removedContainers.isEmpty()) {
       LOG.info("Removed completed containers from NM context: "
           + removedContainers);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
index 1cbb8c7..066d987 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
@@ -26,6 +26,8 @@ import 
org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
     .ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler
+    .ContainerScheduler;
 
 /**
  * The ContainerManager is an entity that manages the life cycle of Containers.
@@ -42,4 +44,6 @@ public interface ContainerManager extends 
ServiceStateChangeListener,
 
   void setBlockNewContainerRequests(boolean blockNewContainerRequests);
 
+  ContainerScheduler getContainerScheduler();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 7c7cb81..74a01e6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -135,6 +135,9 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Change
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -203,6 +206,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
   private final WriteLock writeLock;
   private AMRMProxyService amrmProxyService;
   protected boolean amrmProxyEnabled = false;
+  private final ContainerScheduler containerScheduler;
 
   private long waitForContainersOnShutdownMillis;
 
@@ -226,6 +230,8 @@ public class ContainerManagerImpl extends CompositeService 
implements
     addService(containersLauncher);
 
     this.nodeStatusUpdater = nodeStatusUpdater;
+    this.containerScheduler = createContainerScheduler(context);
+    addService(containerScheduler);
 
     // Start configurable services
     auxiliaryServices = new AuxServices();
@@ -243,7 +249,8 @@ public class ContainerManagerImpl extends CompositeService 
implements
     dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
     dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
-    
+    dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);
+
     addService(dispatcher);
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -295,6 +302,14 @@ public class ContainerManagerImpl extends CompositeService 
implements
     }
   }
 
+  @VisibleForTesting
+  protected ContainerScheduler createContainerScheduler(Context cntxt) {
+    // Currently, this dispatcher is shared by the ContainerManager,
+    // all the containers, the container monitor and all the container.
+    // The ContainerScheduler may use its own dispatcher.
+    return new ContainerScheduler(cntxt, dispatcher, metrics);
+  }
+
   protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
     return new ContainersMonitorImpl(exec, dispatcher, this.context);
   }
@@ -1211,10 +1226,8 @@ public class ContainerManagerImpl extends 
CompositeService implements
       }
     } else {
       context.getNMStateStore().storeContainerKilled(containerID);
-      dispatcher.getEventHandler().handle(
-        new ContainerKillEvent(containerID,
-            ContainerExitStatus.KILLED_BY_APPMASTER,
-            "Container killed by the ApplicationMaster."));
+      container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
+          "Container killed by the ApplicationMaster.");
 
       NMAuditLogger.logSuccess(container.getUser(),    
         AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
@@ -1446,12 +1459,12 @@ public class ContainerManagerImpl extends 
CompositeService implements
 
   @Override
   public OpportunisticContainersStatus getOpportunisticContainersStatus() {
-    return OpportunisticContainersStatus.newInstance();
+    return this.containerScheduler.getOpportunisticContainersStatus();
   }
 
   @Override
   public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
-    LOG.trace("Implementation does not support queuing of Containers!!");
+    this.containerScheduler.updateQueuingLimit(queuingLimit);
   }
 
   @SuppressWarnings("unchecked")
@@ -1612,4 +1625,9 @@ public class ContainerManagerImpl extends 
CompositeService implements
       LOG.info("Container " + containerId + " no longer exists");
     }
   }
+
+  @Override
+  public ContainerScheduler getContainerScheduler() {
+    return this.containerScheduler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index f8a7e35..77ac357 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -80,7 +80,13 @@ public interface Container extends 
EventHandler<ContainerEvent> {
 
   boolean isReInitializing();
 
+  boolean isMarkedForKilling();
+
   boolean canRollback();
 
   void commitUpgrade();
+
+  void sendLaunchEvent();
+
+  void sendKillEvent(int exitStatus, String description);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 6b878aa..a2e8f07 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -72,7 +73,8 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
@@ -163,6 +165,7 @@ public class ContainerImpl implements Container {
   private String ips;
   private volatile ReInitializationContext reInitContext;
   private volatile boolean isReInitializing = false;
+  private volatile boolean isMarkeForKilling = false;
 
   /** The NM-wide configuration - not specific to this container */
   private final Configuration daemonConf;
@@ -284,7 +287,7 @@ public class ContainerImpl implements Container {
     // From NEW State
     .addTransition(ContainerState.NEW,
         EnumSet.of(ContainerState.LOCALIZING,
-            ContainerState.LOCALIZED,
+            ContainerState.SCHEDULED,
             ContainerState.LOCALIZATION_FAILED,
             ContainerState.DONE),
         ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
@@ -296,7 +299,7 @@ public class ContainerImpl implements Container {
 
     // From LOCALIZING State
     .addTransition(ContainerState.LOCALIZING,
-        EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
+        EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED),
         ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
     .addTransition(ContainerState.LOCALIZING,
         ContainerState.LOCALIZATION_FAILED,
@@ -307,7 +310,7 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER,
-        new KillDuringLocalizationTransition())
+        new KillBeforeRunningTransition())
 
     // From LOCALIZATION_FAILED State
     .addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -332,17 +335,18 @@ public class ContainerImpl implements Container {
         ContainerState.LOCALIZATION_FAILED,
         ContainerEventType.RESOURCE_FAILED)
 
-    // From LOCALIZED State
-    .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
+    // From SCHEDULED State
+    .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
         ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
-    .addTransition(ContainerState.LOCALIZED, 
ContainerState.EXITED_WITH_FAILURE,
+    .addTransition(ContainerState.SCHEDULED, 
ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new ExitedWithFailureTransition(true))
-    .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
+    .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
-    .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
-        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER,
+        new KillBeforeRunningTransition())
 
     // From RUNNING State
     .addTransition(ContainerState.RUNNING,
@@ -351,7 +355,7 @@ public class ContainerImpl implements Container {
         new ExitedWithSuccessTransition(true))
     .addTransition(ContainerState.RUNNING,
         EnumSet.of(ContainerState.RELAUNCHING,
-            ContainerState.LOCALIZED,
+            ContainerState.SCHEDULED,
             ContainerState.EXITED_WITH_FAILURE),
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new RetryFailureTransition())
@@ -400,7 +404,7 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
     .addTransition(ContainerState.REINITIALIZING,
-        ContainerState.LOCALIZED,
+        ContainerState.SCHEDULED,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledForReInitializationTransition())
 
@@ -518,9 +522,11 @@ public class ContainerImpl implements Container {
     case NEW:
     case LOCALIZING:
     case LOCALIZATION_FAILED:
-    case LOCALIZED:
+    case SCHEDULED:
+      return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
     case RUNNING:
     case RELAUNCHING:
+    case REINITIALIZING:
     case EXITED_WITH_SUCCESS:
     case EXITED_WITH_FAILURE:
     case KILLING:
@@ -547,7 +553,7 @@ public class ContainerImpl implements Container {
   public Map<Path, List<String>> getLocalizedResources() {
     this.readLock.lock();
     try {
-      if (ContainerState.LOCALIZED == getContainerState()
+      if (ContainerState.SCHEDULED == getContainerState()
           || ContainerState.RELAUNCHING == getContainerState()) {
         return resourceSet.getLocalizedResources();
       } else {
@@ -680,7 +686,15 @@ public class ContainerImpl implements Container {
     // Inform the application
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
-    eventHandler.handle(new ApplicationContainerFinishedEvent(containerId));
+
+    ContainerStatus containerStatus = cloneAndGetContainerStatus();
+    eventHandler.handle(
+        new ApplicationContainerFinishedEvent(
+            containerStatus.getContainerId()));
+
+    // Tell the scheduler the container is Done
+    eventHandler.handle(new ContainerSchedulerEvent(this,
+        ContainerSchedulerEventType.CONTAINER_COMPLETED));
     // Remove the container from the resource-monitor
     eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
     // Tell the logService too
@@ -689,7 +703,8 @@ public class ContainerImpl implements Container {
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
-  private void sendLaunchEvent() {
+  @Override
+  public void sendLaunchEvent() {
     ContainersLauncherEventType launcherEvent =
         ContainersLauncherEventType.LAUNCH_CONTAINER;
     if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
@@ -702,6 +717,22 @@ public class ContainerImpl implements Container {
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendScheduleEvent() {
+    dispatcher.getEventHandler().handle(
+        new ContainerSchedulerEvent(this,
+            ContainerSchedulerEventType.SCHEDULE_CONTAINER)
+    );
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  @Override
+  public void sendKillEvent(int exitStatus, String description) {
+    this.isMarkeForKilling = true;
+    dispatcher.getEventHandler().handle(
+        new ContainerKillEvent(containerId, exitStatus, description));
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
   private void sendRelaunchEvent() {
     ContainersLauncherEventType launcherEvent =
         ContainersLauncherEventType.RELAUNCH_CONTAINER;
@@ -772,7 +803,7 @@ public class ContainerImpl implements Container {
    * to the ResourceLocalizationManager and enters LOCALIZING state.
    * 
    * If there are no resources to localize, sends LAUNCH_CONTAINER event
-   * and enters LOCALIZED state directly.
+   * and enters SCHEDULED state directly.
    * 
    * If there are any invalid resources specified, enters LOCALIZATION_FAILED
    * directly.
@@ -838,9 +869,9 @@ public class ContainerImpl implements Container {
         }
         return ContainerState.LOCALIZING;
       } else {
-        container.sendLaunchEvent();
+        container.sendScheduleEvent();
         container.metrics.endInitingContainer();
-        return ContainerState.LOCALIZED;
+        return ContainerState.SCHEDULED;
       }
     }
   }
@@ -880,7 +911,7 @@ public class ContainerImpl implements Container {
           new ContainerLocalizationEvent(LocalizationEventType.
               CONTAINER_RESOURCES_LOCALIZED, container));
 
-      container.sendLaunchEvent();
+      container.sendScheduleEvent();
       container.metrics.endInitingContainer();
 
       // If this is a recovered container that has already launched, skip
@@ -900,7 +931,7 @@ public class ContainerImpl implements Container {
                 SharedCacheUploadEventType.UPLOAD));
       }
 
-      return ContainerState.LOCALIZED;
+      return ContainerState.SCHEDULED;
     }
   }
 
@@ -1090,7 +1121,7 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Transition from LOCALIZED state to RUNNING state upon receiving
+   * Transition from SCHEDULED state to RUNNING state upon receiving
    * a CONTAINER_LAUNCHED event.
    */
   static class LaunchTransition extends ContainerTransition {
@@ -1248,7 +1279,7 @@ public class ContainerImpl implements Container {
             container.containerId.getApplicationAttemptId().getApplicationId(),
             container.containerId);
         new KilledForReInitializationTransition().transition(container, event);
-        return ContainerState.LOCALIZED;
+        return ContainerState.SCHEDULED;
       } else {
         new ExitedWithFailureTransition(true).transition(container, event);
         return ContainerState.EXITED_WITH_FAILURE;
@@ -1330,7 +1361,7 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Transition to LOCALIZED and wait for RE-LAUNCH
+   * Transition to SCHEDULED and wait for RE-LAUNCH
    */
   static class KilledForReInitializationTransition extends ContainerTransition 
{
 
@@ -1354,8 +1385,8 @@ public class ContainerImpl implements Container {
 
       container.resourceSet =
           container.reInitContext.mergedResourceSet(container.resourceSet);
-
-      container.sendLaunchEvent();
+      container.isMarkeForKilling = false;
+      container.sendScheduleEvent();
     }
   }
 
@@ -1383,7 +1414,7 @@ public class ContainerImpl implements Container {
    * Transition from LOCALIZING to KILLING upon receiving
    * KILL_CONTAINER event.
    */
-  static class KillDuringLocalizationTransition implements
+  static class KillBeforeRunningTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
@@ -1415,7 +1446,7 @@ public class ContainerImpl implements Container {
 
   /**
    * Transitions upon receiving KILL_CONTAINER.
-   * - LOCALIZED -> KILLING.
+   * - SCHEDULED -> KILLING.
    * - RUNNING -> KILLING.
    * - REINITIALIZING -> KILLING.
    */
@@ -1641,7 +1672,8 @@ public class ContainerImpl implements Container {
             stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitionException e) {
         LOG.warn("Can't handle this event at current state: Current: ["
-            + oldState + "], eventType: [" + event.getType() + "]", e);
+            + oldState + "], eventType: [" + event.getType() + "]," +
+            " container: [" + containerID + "]", e);
       }
       if (newState != null && oldState != newState) {
         LOG.info("Container " + containerID + " transitioned from "
@@ -1700,6 +1732,11 @@ public class ContainerImpl implements Container {
   }
 
   @Override
+  public boolean isMarkedForKilling() {
+    return this.isMarkeForKilling;
+  }
+
+  @Override
   public boolean canRollback() {
     return (this.reInitContext != null)
         && (this.reInitContext.canRollback());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 70de90c..91d1356 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
 public enum ContainerState {
-  NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
+  NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
   REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
   CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index d774030..823457f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -104,9 +104,10 @@ public class ContainerLaunch implements Callable<Integer> {
   private final Context context;
   private final ContainerManagerImpl containerManager;
   
-  protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+  protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
   protected AtomicBoolean completed = new AtomicBoolean(false);
 
+  private volatile boolean killedBeforeStart = false;
   private long sleepDelayBeforeSigKill = 250;
   private long maxKillWaitTime = 2000;
 
@@ -401,7 +402,12 @@ public class ContainerLaunch implements Callable<Integer> {
   @SuppressWarnings("unchecked")
   protected int launchContainer(ContainerStartContext ctx) throws IOException {
     ContainerId containerId = container.getContainerId();
-
+    if (container.isMarkedForKilling()) {
+      LOG.info("Container " + containerId + " not launched as it has already "
+          + "been marked for Killing");
+      this.killedBeforeStart = true;
+      return ExitCode.TERMINATED.getExitCode();
+    }
     // LaunchContainer is a blocking call. We are here almost means the
     // container is launched, so send out the event.
     dispatcher.getEventHandler().handle(new ContainerEvent(
@@ -410,7 +416,7 @@ public class ContainerLaunch implements Callable<Integer> {
     context.getNMStateStore().storeContainerLaunched(containerId);
 
     // Check if the container is signalled to be killed.
-    if (!shouldLaunchContainer.compareAndSet(false, true)) {
+    if (!containerAlreadyLaunched.compareAndSet(false, true)) {
       LOG.info("Container " + containerId + " not launched as "
           + "cleanup already called");
       return ExitCode.TERMINATED.getExitCode();
@@ -451,10 +457,14 @@ public class ContainerLaunch implements Callable<Integer> 
{
         || exitCode == ExitCode.TERMINATED.getExitCode()) {
       // If the process was killed, Send container_cleanedup_after_kill and
       // just break out of this method.
-      dispatcher.getEventHandler().handle(
-          new ContainerExitEvent(containerId,
-              ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
-              diagnosticInfo.toString()));
+
+      // If Container was killed before starting... NO need to do this.
+      if (!killedBeforeStart) {
+        dispatcher.getEventHandler().handle(
+            new ContainerExitEvent(containerId,
+                ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
+                diagnosticInfo.toString()));
+      }
     } else if (exitCode != 0) {
       handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
           diagnosticInfo);
@@ -565,7 +575,8 @@ public class ContainerLaunch implements Callable<Integer> {
     }
 
     // launch flag will be set to true if process already launched
-    boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, 
true);
+    boolean alreadyLaunched =
+        !containerAlreadyLaunched.compareAndSet(false, true);
     if (!alreadyLaunched) {
       LOG.info("Container " + containerIdStr + " not launched."
           + " No cleanup needed to be done");
@@ -660,7 +671,8 @@ public class ContainerLaunch implements Callable<Integer> {
 
     LOG.info("Sending signal " + command + " to container " + containerIdStr);
 
-    boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, 
true);
+    boolean alreadyLaunched =
+        !containerAlreadyLaunched.compareAndSet(false, true);
     if (!alreadyLaunched) {
       LOG.info("Container " + containerIdStr + " not launched."
           + " Not sending the signal");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
index 3cd31b7..a04a23f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
@@ -39,7 +39,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+
 
 /**
  * This is a ContainerLaunch which has been recovered after an NM restart (for
@@ -57,7 +57,7 @@ public class RecoveredContainerLaunch extends ContainerLaunch 
{
   {
     super(context, configuration, dispatcher, exec, app, container, 
dirsHandler,
       containerManager);
-    this.shouldLaunchContainer.set(true);
+    this.containerAlreadyLaunched.set(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 1069b4f..f27e8d9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -19,29 +19,53 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
 
 public interface ContainersMonitor extends Service,
     EventHandler<ContainersMonitorEvent>, ResourceView {
-  public ResourceUtilization getContainersUtilization();
+  ResourceUtilization getContainersUtilization();
 
-  ResourceUtilization getContainersAllocation();
-
-  boolean hasResourcesAvailable(ProcessTreeInfo pti);
-
-  void increaseContainersAllocation(ProcessTreeInfo pti);
-
-  void decreaseContainersAllocation(ProcessTreeInfo pti);
-
-  void increaseResourceUtilization(ResourceUtilization resourceUtil,
-      ProcessTreeInfo pti);
-
-  void decreaseResourceUtilization(ResourceUtilization resourceUtil,
-      ProcessTreeInfo pti);
+  float getVmemRatio();
 
   void subtractNodeResourcesFromResourceUtilization(
       ResourceUtilization resourceUtil);
+
+  class ContainerManagerUtils {
+    /**
+     * Utility method to add a {@link Resource} to the
+     * {@link ResourceUtilization}.
+     * @param containersMonitor Containers Monitor.
+     * @param resourceUtil Resource Utilization.
+     * @param resource Resource.
+     */
+    public static void increaseResourceUtilization(
+        ContainersMonitor containersMonitor, ResourceUtilization resourceUtil,
+        Resource resource) {
+      float vCores = (float) resource.getVirtualCores() /
+          containersMonitor.getVCoresAllocatedForContainers();
+      int vmem = (int) (resource.getMemorySize()
+          * containersMonitor.getVmemRatio());
+      resourceUtil.addTo((int)resource.getMemorySize(), vmem, vCores);
+    }
+
+    /**
+     * Utility method to subtract a {@link Resource} from the
+     * {@link ResourceUtilization}.
+     * @param containersMonitor Containers Monitor.
+     * @param resourceUtil Resource Utilization.
+     * @param resource Resource.
+     */
+    public static void decreaseResourceUtilization(
+        ContainersMonitor containersMonitor, ResourceUtilization resourceUtil,
+        Resource resource) {
+      float vCores = (float) resource.getVirtualCores() /
+          containersMonitor.getVCoresAllocatedForContainers();
+      int vmem = (int) (resource.getMemorySize()
+          * containersMonitor.getVmemRatio());
+      resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 1db2357..d1ec000 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -82,9 +82,6 @@ public class ContainersMonitorImpl extends AbstractService 
implements
   private int nodeCpuPercentageForYARN;
 
   private ResourceUtilization containersUtilization;
-  // Tracks the aggregated allocation of the currently allocated containers
-  // when queuing of containers at the NMs is enabled.
-  private ResourceUtilization containersAllocation;
 
   private volatile boolean stopped = false;
 
@@ -99,7 +96,6 @@ public class ContainersMonitorImpl extends AbstractService 
implements
     this.monitoringThread = new MonitoringThread();
 
     this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
-    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
   }
 
   @Override
@@ -630,6 +626,8 @@ public class ContainersMonitorImpl extends AbstractService 
implements
       LOG.warn("Container " + containerId.toString() + "does not exist");
       return;
     }
+    // YARN-5860: Route this through the ContainerScheduler to
+    //       fix containerAllocation
     container.setResource(resource);
   }
 
@@ -729,67 +727,6 @@ public class ContainersMonitorImpl extends AbstractService 
implements
     this.containersUtilization = utilization;
   }
 
-  public ResourceUtilization getContainersAllocation() {
-    return this.containersAllocation;
-  }
-
-  /**
-   * @return true if there are available allocated resources for the given
-   *         container to start.
-   */
-  @Override
-  public boolean hasResourcesAvailable(ProcessTreeInfo pti) {
-    synchronized (this.containersAllocation) {
-      // Check physical memory.
-      if (this.containersAllocation.getPhysicalMemory() +
-          (int) (pti.getPmemLimit() >> 20) >
-          (int) (getPmemAllocatedForContainers() >> 20)) {
-        return false;
-      }
-      // Check virtual memory.
-      if (isVmemCheckEnabled() &&
-          this.containersAllocation.getVirtualMemory() +
-          (int) (pti.getVmemLimit() >> 20) >
-          (int) (getVmemAllocatedForContainers() >> 20)) {
-        return false;
-      }
-      // Check CPU.
-      if (this.containersAllocation.getCPU()
-          + allocatedCpuUsage(pti) > 1.0f) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public void increaseContainersAllocation(ProcessTreeInfo pti) {
-    synchronized (this.containersAllocation) {
-      increaseResourceUtilization(this.containersAllocation, pti);
-    }
-  }
-
-  @Override
-  public void decreaseContainersAllocation(ProcessTreeInfo pti) {
-    synchronized (this.containersAllocation) {
-      decreaseResourceUtilization(this.containersAllocation, pti);
-    }
-  }
-
-  @Override
-  public void increaseResourceUtilization(ResourceUtilization resourceUtil,
-      ProcessTreeInfo pti) {
-    resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
-        (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
-  }
-
-  @Override
-  public void decreaseResourceUtilization(ResourceUtilization resourceUtil,
-      ProcessTreeInfo pti) {
-    resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
-        (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
-  }
-
   @Override
   public void subtractNodeResourcesFromResourceUtilization(
       ResourceUtilization resourceUtil) {
@@ -797,14 +734,9 @@ public class ContainersMonitorImpl extends AbstractService 
implements
         (int) (getVmemAllocatedForContainers() >> 20), 1.0f);
   }
 
-  /**
-   * Calculates the vCores CPU usage that is assigned to the given
-   * {@link ProcessTreeInfo}. In particular, it takes into account the number 
of
-   * vCores that are allowed to be used by the NM and returns the CPU usage
-   * as a normalized value between {@literal >=} 0 and {@literal <=} 1.
-   */
-  private float allocatedCpuUsage(ProcessTreeInfo pti) {
-    return (float) pti.getCpuVcores() / getVCoresAllocatedForContainers();
+  @Override
+  public float getVmemRatio() {
+    return vmemRatio;
   }
 
   @Override
@@ -875,5 +807,4 @@ public class ContainersMonitorImpl extends AbstractService 
implements
             startEvent.getVmemLimit(), startEvent.getPmemLimit(),
             startEvent.getCpuVcores()));
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to