YARN-5764. NUMA awareness support for launching containers. Contributed by 
Devaraj K.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a82d4a2e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a82d4a2e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a82d4a2e

Branch: refs/heads/HDFS-12996
Commit: a82d4a2e3a6a5448e371cef0cb86d5dbe4871ccd
Parents: 45cccad
Author: Miklos Szegedi <szege...@apache.org>
Authored: Tue Mar 13 11:03:27 2018 -0700
Committer: Miklos Szegedi <szege...@apache.org>
Committed: Tue Mar 13 12:36:57 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  27 ++
 .../src/main/resources/yarn-default.xml         |  51 +++
 .../nodemanager/LinuxContainerExecutor.java     |  18 +-
 .../linux/privileged/PrivilegedOperation.java   |   3 +-
 .../linux/resources/ResourceHandlerModule.java  |  10 +
 .../linux/resources/numa/NumaNodeResource.java  | 204 +++++++++++
 .../resources/numa/NumaResourceAllocation.java  |  69 ++++
 .../resources/numa/NumaResourceAllocator.java   | 342 +++++++++++++++++++
 .../resources/numa/NumaResourceHandlerImpl.java | 108 ++++++
 .../linux/resources/numa/package-info.java      |  28 ++
 .../numa/TestNumaResourceAllocator.java         | 281 +++++++++++++++
 .../numa/TestNumaResourceHandlerImpl.java       | 181 ++++++++++
 12 files changed, 1318 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6677478..2afff43 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3585,6 +3585,22 @@ public class YarnConfiguration extends Configuration {
       DEFAULT_TIMELINE_SERVICE_COLLECTOR_WEBAPP_HTTPS_ADDRESS =
       DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS;
 
+  /**
+   * Settings for NUMA awareness.
+   */
+  public static final String NM_NUMA_AWARENESS_ENABLED = NM_PREFIX
+      + "numa-awareness.enabled";
+  public static final boolean DEFAULT_NM_NUMA_AWARENESS_ENABLED = false;
+  public static final String NM_NUMA_AWARENESS_READ_TOPOLOGY = NM_PREFIX
+      + "numa-awareness.read-topology";
+  public static final boolean DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY = false;
+  public static final String NM_NUMA_AWARENESS_NODE_IDS = NM_PREFIX
+      + "numa-awareness.node-ids";
+  public static final String NM_NUMA_AWARENESS_NUMACTL_CMD = NM_PREFIX
+      + "numa-awareness.numactl.cmd";
+  public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD =
+      "/usr/bin/numactl";
+
   public YarnConfiguration() {
     super();
   }
@@ -3791,6 +3807,17 @@ public class YarnConfiguration extends Configuration {
         YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
   }
 
+  /**
+   * Returns whether the NUMA awareness is enabled.
+   *
+   * @param conf the configuration
+   * @return whether the NUMA awareness is enabled.
+   */
+  public static boolean numaAwarenessEnabled(Configuration conf) {
+    return conf.getBoolean(NM_NUMA_AWARENESS_ENABLED,
+        DEFAULT_NM_NUMA_AWARENESS_ENABLED);
+  }
+
   /* For debugging. mp configurations to system output as XML format. */
   public static void main(String[] args) throws Exception {
     new YarnConfiguration(new Configuration()).writeXml(System.out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index adf8d8a..e192a0d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3711,4 +3711,55 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+    Whether to enable the NUMA awareness for containers in Node Manager.
+    </description>
+    <name>yarn.nodemanager.numa-awareness.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+    Whether to read the NUMA topology from the system or from the
+    configurations. If the value is true then NM reads the NUMA topology from
+    system using the command 'numactl --hardware'. If the value is false then 
NM
+    reads the topology from the configurations
+    'yarn.nodemanager.numa-awareness.node-ids'(for node id's),
+    'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.memory'(for each node 
memory),
+    'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.cpus'(for each node cpus).
+    </description>
+    <name>yarn.nodemanager.numa-awareness.read-topology</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+    NUMA node id's in the form of comma separated list. Memory and No of CPUs
+    will be read using the properties
+    'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.memory' and
+    'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.cpus' for each id 
specified
+    in this value. This property value will be read only when
+    'yarn.nodemanager.numa-awareness.read-topology=false'.
+
+    For example, if yarn.nodemanager.numa-awareness.node-ids=0,1
+    then need to specify memory and cpus for node id's '0' and '1' like below,
+    yarn.nodemanager.numa-awareness.0.memory=73717
+    yarn.nodemanager.numa-awareness.0.cpus=4
+    yarn.nodemanager.numa-awareness.1.memory=73727
+    yarn.nodemanager.numa-awareness.1.cpus=4
+    </description>
+    <name>yarn.nodemanager.numa-awareness.node-ids</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+    The numactl command path which controls NUMA policy for processes or
+    shared memory.
+    </description>
+    <name>yarn.nodemanager.numa-awareness.numactl.cmd</name>
+    <value>/usr/bin/numactl</value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index 44edc21..e92c4e2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -485,6 +485,7 @@ public class LinuxContainerExecutor extends 
ContainerExecutor {
             container.getResource());
     String resourcesOptions = resourcesHandler.getResourcesOption(containerId);
     String tcCommandFile = null;
+    List<String> numaArgs = null;
 
     try {
       if (resourceHandlerChain != null) {
@@ -506,6 +507,9 @@ public class LinuxContainerExecutor extends 
ContainerExecutor {
             case TC_MODIFY_STATE:
               tcCommandFile = op.getArguments().get(0);
               break;
+            case ADD_NUMA_PARAMS:
+              numaArgs = op.getArguments();
+              break;
             default:
               LOG.warn("PrivilegedOperation type unsupported in launch: "
                   + op.getOperationType());
@@ -536,7 +540,7 @@ public class LinuxContainerExecutor extends 
ContainerExecutor {
       if (pidFilePath != null) {
 
         ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
-            ctx, pidFilePath, resourcesOptions, tcCommandFile);
+            ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);
 
         linuxContainerRuntime.launchContainer(runtimeContext);
       } else {
@@ -610,11 +614,12 @@ public class LinuxContainerExecutor extends 
ContainerExecutor {
   }
 
   private ContainerRuntimeContext buildContainerRuntimeContext(
-      ContainerStartContext ctx, Path pidFilePath,
-      String resourcesOptions, String tcCommandFile) {
+      ContainerStartContext ctx, Path pidFilePath, String resourcesOptions,
+      String tcCommandFile, List<String> numaArgs) {
 
     List<String> prefixCommands = new ArrayList<>();
     addSchedPriorityCommand(prefixCommands);
+    addNumaArgsToCommand(prefixCommands, numaArgs);
 
     Container container = ctx.getContainer();
 
@@ -662,6 +667,13 @@ public class LinuxContainerExecutor extends 
ContainerExecutor {
     return linuxContainerRuntime.getIpAndHost(container);
   }
 
+  private void addNumaArgsToCommand(List<String> prefixCommands,
+      List<String> numaArgs) {
+    if (numaArgs != null) {
+      prefixCommands.addAll(numaArgs);
+    }
+  }
+
   @Override
   public int reacquireContainer(ContainerReacquisitionContext ctx)
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
index ad8c22f..189c0d0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
@@ -53,7 +53,8 @@ public class PrivilegedOperation {
     RUN_DOCKER_CMD("--run-docker"),
     GPU("--module-gpu"),
     FPGA("--module-fpga"),
-    LIST_AS_USER(""); //no CLI switch supported yet.
+    LIST_AS_USER(""), // no CLI switch supported yet.
+    ADD_NUMA_PARAMS(""); // no CLI switch supported yet.
 
     private final String option;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
index a02204d..fc55696 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceHandlerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
@@ -253,6 +254,14 @@ public class ResourceHandlerModule {
     return cGroupsMemoryResourceHandler;
   }
 
+  private static ResourceHandler getNumaResourceHandler(Configuration conf,
+      Context nmContext) {
+    if (YarnConfiguration.numaAwarenessEnabled(conf)) {
+      return new NumaResourceHandlerImpl(conf, nmContext);
+    }
+    return null;
+  }
+
   private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
       ResourceHandler handler) {
     if (handler != null) {
@@ -273,6 +282,7 @@ public class ResourceHandlerModule {
         initMemoryResourceHandler(conf));
     addHandlerIfNotNull(handlerList,
         initCGroupsCpuResourceHandler(conf));
+    addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
     addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
     resourceHandlerChain = new ResourceHandlerChain(handlerList);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java
new file mode 100644
index 0000000..f434412
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaNodeResource.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * NumaNodeResource class holds the NUMA node topology with the total and used
+ * resources.
+ */
+public class NumaNodeResource {
+  private String nodeId;
+  private long totalMemory;
+  private int totalCpus;
+  private long usedMemory;
+  private int usedCpus;
+
+  private static final Log LOG = LogFactory.getLog(NumaNodeResource.class);
+
+  private Map<ContainerId, Long> containerVsMemUsage =
+      new ConcurrentHashMap<>();
+  private Map<ContainerId, Integer> containerVsCpusUsage =
+      new ConcurrentHashMap<>();
+
+  public NumaNodeResource(String nodeId, long totalMemory, int totalCpus) {
+    this.nodeId = nodeId;
+    this.totalMemory = totalMemory;
+    this.totalCpus = totalCpus;
+  }
+
+  /**
+   * Checks whether the specified resources available or not.
+   *
+   * @param resource resource
+   * @return whether the specified resources available or not
+   */
+  public boolean isResourcesAvailable(Resource resource) {
+    LOG.debug(
+        "Memory available:" + (totalMemory - usedMemory) + ", CPUs available:"
+            + (totalCpus - usedCpus) + ", requested:" + resource);
+    if ((totalMemory - usedMemory) >= resource.getMemorySize()
+        && (totalCpus - usedCpus) >= resource.getVirtualCores()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Assigns available memory and returns the remaining needed memory.
+   *
+   * @param memreq required memory
+   * @param containerId which container memory to assign
+   * @return remaining needed memory
+   */
+  public long assignAvailableMemory(long memreq, ContainerId containerId) {
+    long memAvailable = totalMemory - usedMemory;
+    if (memAvailable >= memreq) {
+      containerVsMemUsage.put(containerId, memreq);
+      usedMemory += memreq;
+      return 0;
+    } else {
+      usedMemory += memAvailable;
+      containerVsMemUsage.put(containerId, memAvailable);
+      return memreq - memAvailable;
+    }
+  }
+
+  /**
+   * Assigns available cpu's and returns the remaining needed cpu's.
+   *
+   * @param cpusreq required cpu's
+   * @param containerId which container cpu's to assign
+   * @return remaining needed cpu's
+   */
+  public int assignAvailableCpus(int cpusreq, ContainerId containerId) {
+    int cpusAvailable = totalCpus - usedCpus;
+    if (cpusAvailable >= cpusreq) {
+      containerVsCpusUsage.put(containerId, cpusreq);
+      usedCpus += cpusreq;
+      return 0;
+    } else {
+      usedCpus += cpusAvailable;
+      containerVsCpusUsage.put(containerId, cpusAvailable);
+      return cpusreq - cpusAvailable;
+    }
+  }
+
+  /**
+   * Assigns the requested resources for Container.
+   *
+   * @param resource resource to assign
+   * @param containerId to which container the resources to assign
+   */
+  public void assignResources(Resource resource, ContainerId containerId) {
+    containerVsMemUsage.put(containerId, resource.getMemorySize());
+    containerVsCpusUsage.put(containerId, resource.getVirtualCores());
+    usedMemory += resource.getMemorySize();
+    usedCpus += resource.getVirtualCores();
+  }
+
+  /**
+   * Releases the assigned resources for Container.
+   *
+   * @param containerId to which container the assigned resources to release
+   */
+  public void releaseResources(ContainerId containerId) {
+    if (containerVsMemUsage.containsKey(containerId)) {
+      usedMemory -= containerVsMemUsage.get(containerId);
+      containerVsMemUsage.remove(containerId);
+    }
+    if (containerVsCpusUsage.containsKey(containerId)) {
+      usedCpus -= containerVsCpusUsage.get(containerId);
+      containerVsCpusUsage.remove(containerId);
+    }
+  }
+
+  /**
+   * Recovers the memory resources for Container.
+   *
+   * @param containerId recover the memory resources for the Container
+   * @param memory memory to recover
+   */
+  public void recoverMemory(ContainerId containerId, long memory) {
+    containerVsMemUsage.put(containerId, memory);
+    usedMemory += memory;
+  }
+
+  /**
+   * Recovers the cpu's resources for Container.
+   *
+   * @param containerId recover the cpu's resources for the Container
+   * @param cpus cpu's to recover
+   */
+  public void recoverCpus(ContainerId containerId, int cpus) {
+    containerVsCpusUsage.put(containerId, cpus);
+    usedCpus += cpus;
+  }
+
+  @Override
+  public String toString() {
+    return "Node Id:" + nodeId + "\tMemory:" + totalMemory + "\tCPus:"
+        + totalCpus;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+    result = prime * result + (int) (totalMemory ^ (totalMemory >>> 32));
+    result = prime * result + totalCpus;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    NumaNodeResource other = (NumaNodeResource) obj;
+    if (nodeId == null) {
+      if (other.nodeId != null) {
+        return false;
+      }
+    } else if (!nodeId.equals(other.nodeId)) {
+      return false;
+    }
+    if (totalMemory != other.totalMemory) {
+      return false;
+    }
+    if (totalCpus != other.totalCpus) {
+      return false;
+    }
+    return true;
+  }
+
+  public String getNodeId() {
+    return nodeId;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
new file mode 100644
index 0000000..f8d4739
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * NumaResourceAllocation contains Memory nodes and CPU nodes assigned to a
+ * container.
+ */
+public class NumaResourceAllocation implements Serializable {
+  private static final long serialVersionUID = 6339719798446595123L;
+  private Map<String, Long> nodeVsMemory;
+  private Map<String, Integer> nodeVsCpus;
+
+  public NumaResourceAllocation() {
+    nodeVsMemory = new HashMap<>();
+    nodeVsCpus = new HashMap<>();
+  }
+
+  public NumaResourceAllocation(String memNodeId, long memory, String 
cpuNodeId,
+      int cpus) {
+    this();
+    nodeVsMemory.put(memNodeId, memory);
+    nodeVsCpus.put(cpuNodeId, cpus);
+  }
+
+  public void addMemoryNode(String memNodeId, long memory) {
+    nodeVsMemory.put(memNodeId, memory);
+  }
+
+  public void addCpuNode(String cpuNodeId, int cpus) {
+    nodeVsCpus.put(cpuNodeId, cpus);
+  }
+
+  public Set<String> getMemNodes() {
+    return nodeVsMemory.keySet();
+  }
+
+  public Set<String> getCpuNodes() {
+    return nodeVsCpus.keySet();
+  }
+
+  public Map<String, Long> getNodeVsMemory() {
+    return nodeVsMemory;
+  }
+
+  public Map<String, Integer> getNodeVsCpus() {
+    return nodeVsCpus;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
new file mode 100644
index 0000000..e152bda
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * NUMA Resources Allocator reads the NUMA topology and assigns NUMA nodes to
+ * the containers.
+ */
+public class NumaResourceAllocator {
+
+  private static final Log LOG = 
LogFactory.getLog(NumaResourceAllocator.class);
+
+  // Regex to find node ids, Ex: 'available: 2 nodes (0-1)'
+  private static final String NUMA_NODEIDS_REGEX =
+      "available:\\s*[0-9]+\\s*nodes\\s*\\(([0-9\\-,]*)\\)";
+
+  // Regex to find node memory, Ex: 'node 0 size: 73717 MB'
+  private static final String NUMA_NODE_MEMORY_REGEX =
+      "node\\s*<NUMA-NODE>\\s*size:\\s*([0-9]+)\\s*([KMG]B)";
+
+  // Regex to find node cpus, Ex: 'node 0 cpus: 0 2 4 6'
+  private static final String NUMA_NODE_CPUS_REGEX =
+      "node\\s*<NUMA-NODE>\\s*cpus:\\s*([0-9\\s]+)";
+
+  private static final String GB = "GB";
+  private static final String KB = "KB";
+  private static final String NUMA_NODE = "<NUMA-NODE>";
+  private static final String SPACE = "\\s";
+  private static final long DEFAULT_NUMA_NODE_MEMORY = 1024;
+  private static final int DEFAULT_NUMA_NODE_CPUS = 1;
+  private static final String NUMA_RESOURCE_TYPE = "numa";
+
+  private List<NumaNodeResource> numaNodesList = new ArrayList<>();
+  private Map<String, NumaNodeResource> numaNodeIdVsResource = new HashMap<>();
+  private int currentAssignNode;
+
+  private Context context;
+
+  public NumaResourceAllocator(Context context) {
+    this.context = context;
+  }
+
+  public void init(Configuration conf) throws YarnException {
+    if (conf.getBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY,
+        YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY)) {
+      LOG.info("Reading NUMA topology using 'numactl --hardware' command.");
+      String cmdOutput = executeNGetCmdOutput(conf);
+      String[] outputLines = cmdOutput.split("\\n");
+      Pattern pattern = Pattern.compile(NUMA_NODEIDS_REGEX);
+      String nodeIdsStr = null;
+      for (String line : outputLines) {
+        Matcher matcher = pattern.matcher(line);
+        if (matcher.find()) {
+          nodeIdsStr = matcher.group(1);
+          break;
+        }
+      }
+      if (nodeIdsStr == null) {
+        throw new YarnException("Failed to get numa nodes from"
+            + " 'numactl --hardware' output and output is:\n" + cmdOutput);
+      }
+      String[] nodeIdCommaSplits = nodeIdsStr.split("[,\\s]");
+      for (String nodeIdOrRange : nodeIdCommaSplits) {
+        if (nodeIdOrRange.contains("-")) {
+          String[] beginNEnd = nodeIdOrRange.split("-");
+          int endNode = Integer.parseInt(beginNEnd[1]);
+          for (int nodeId = Integer
+              .parseInt(beginNEnd[0]); nodeId <= endNode; nodeId++) {
+            long memory = parseMemory(outputLines, String.valueOf(nodeId));
+            int cpus = parseCpus(outputLines, String.valueOf(nodeId));
+            addToCollection(String.valueOf(nodeId), memory, cpus);
+          }
+        } else {
+          long memory = parseMemory(outputLines, nodeIdOrRange);
+          int cpus = parseCpus(outputLines, nodeIdOrRange);
+          addToCollection(nodeIdOrRange, memory, cpus);
+        }
+      }
+    } else {
+      LOG.info("Reading NUMA topology using configurations.");
+      Collection<String> nodeIds = conf
+          .getStringCollection(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS);
+      for (String nodeId : nodeIds) {
+        long mem = conf.getLong(
+            "yarn.nodemanager.numa-awareness." + nodeId + ".memory",
+            DEFAULT_NUMA_NODE_MEMORY);
+        int cpus = conf.getInt(
+            "yarn.nodemanager.numa-awareness." + nodeId + ".cpus",
+            DEFAULT_NUMA_NODE_CPUS);
+        addToCollection(nodeId, mem, cpus);
+      }
+    }
+    if (numaNodesList.isEmpty()) {
+      throw new YarnException("There are no available NUMA nodes"
+          + " for making containers NUMA aware.");
+    }
+    LOG.info("Available numa nodes with capacities : " + numaNodesList.size());
+  }
+
+  @VisibleForTesting
+  String executeNGetCmdOutput(Configuration conf) throws YarnException {
+    String numaCtlCmd = conf.get(
+        YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
+        YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
+    String[] args = new String[] {numaCtlCmd, "--hardware"};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(args);
+    try {
+      shExec.execute();
+    } catch (IOException e) {
+      throw new YarnException("Failed to read the numa configurations.", e);
+    }
+    return shExec.getOutput();
+  }
+
+  private int parseCpus(String[] outputLines, String nodeId) {
+    int cpus = 0;
+    Pattern patternNodeCPUs = Pattern
+        .compile(NUMA_NODE_CPUS_REGEX.replace(NUMA_NODE, nodeId));
+    for (String line : outputLines) {
+      Matcher matcherNodeCPUs = patternNodeCPUs.matcher(line);
+      if (matcherNodeCPUs.find()) {
+        String cpusStr = matcherNodeCPUs.group(1);
+        cpus = cpusStr.split(SPACE).length;
+        break;
+      }
+    }
+    return cpus;
+  }
+
+  private long parseMemory(String[] outputLines, String nodeId)
+      throws YarnException {
+    long memory = 0;
+    String units;
+    Pattern patternNodeMem = Pattern
+        .compile(NUMA_NODE_MEMORY_REGEX.replace(NUMA_NODE, nodeId));
+    for (String line : outputLines) {
+      Matcher matcherNodeMem = patternNodeMem.matcher(line);
+      if (matcherNodeMem.find()) {
+        try {
+          memory = Long.parseLong(matcherNodeMem.group(1));
+          units = matcherNodeMem.group(2);
+          if (GB.equals(units)) {
+            memory = memory * 1024;
+          } else if (KB.equals(units)) {
+            memory = memory / 1024;
+          }
+        } catch (Exception ex) {
+          throw new YarnException("Failed to get memory for node:" + nodeId,
+              ex);
+        }
+        break;
+      }
+    }
+    return memory;
+  }
+
+  private void addToCollection(String nodeId, long memory, int cpus) {
+    NumaNodeResource numaNode = new NumaNodeResource(nodeId, memory, cpus);
+    numaNodesList.add(numaNode);
+    numaNodeIdVsResource.put(nodeId, numaNode);
+  }
+
+  /**
+   * Allocates the available NUMA nodes for the requested containerId with
+   * resource in a round robin fashion.
+   *
+   * @param container the container to allocate NUMA resources
+   * @return the assigned NUMA Node info or null if resources not available.
+   * @throws ResourceHandlerException when failed to store NUMA resources
+   */
+  public synchronized NumaResourceAllocation allocateNumaNodes(
+      Container container) throws ResourceHandlerException {
+    NumaResourceAllocation allocation = allocate(container.getContainerId(),
+        container.getResource());
+    if (allocation != null) {
+      try {
+        // Update state store.
+        context.getNMStateStore().storeAssignedResources(container,
+            NUMA_RESOURCE_TYPE, Arrays.asList(allocation));
+      } catch (IOException e) {
+        releaseNumaResource(container.getContainerId());
+        throw new ResourceHandlerException(e);
+      }
+    }
+    return allocation;
+  }
+
+  private NumaResourceAllocation allocate(ContainerId containerId,
+      Resource resource) {
+    for (int index = 0; index < numaNodesList.size(); index++) {
+      NumaNodeResource numaNode = numaNodesList
+          .get((currentAssignNode + index) % numaNodesList.size());
+      if (numaNode.isResourcesAvailable(resource)) {
+        numaNode.assignResources(resource, containerId);
+        LOG.info("Assigning NUMA node " + numaNode.getNodeId() + " for memory, 
"
+            + numaNode.getNodeId() + " for cpus for the " + containerId);
+        currentAssignNode = (currentAssignNode + index + 1)
+            % numaNodesList.size();
+        return new NumaResourceAllocation(numaNode.getNodeId(),
+            resource.getMemorySize(), numaNode.getNodeId(),
+            resource.getVirtualCores());
+      }
+    }
+
+    // If there is no single node matched for the container resource
+    // Check the NUMA nodes for Memory resources
+    NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation();
+    long memreq = resource.getMemorySize();
+    for (NumaNodeResource numaNode : numaNodesList) {
+      long memrem = numaNode.assignAvailableMemory(memreq, containerId);
+      assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - 
memrem);
+      memreq = memrem;
+      if (memreq == 0) {
+        break;
+      }
+    }
+    if (memreq != 0) {
+      LOG.info("There is no available memory:" + resource.getMemorySize()
+          + " in numa nodes for " + containerId);
+      releaseNumaResource(containerId);
+      return null;
+    }
+
+    // Check the NUMA nodes for CPU resources
+    int cpusreq = resource.getVirtualCores();
+    for (int index = 0; index < numaNodesList.size(); index++) {
+      NumaNodeResource numaNode = numaNodesList
+          .get((currentAssignNode + index) % numaNodesList.size());
+      int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId);
+      assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem);
+      cpusreq = cpusrem;
+      if (cpusreq == 0) {
+        currentAssignNode = (currentAssignNode + index + 1)
+            % numaNodesList.size();
+        break;
+      }
+    }
+
+    if (cpusreq != 0) {
+      LOG.info("There are no available cpus:" + resource.getVirtualCores()
+          + " in numa nodes for " + containerId);
+      releaseNumaResource(containerId);
+      return null;
+    }
+    LOG.info("Assigning multiple NUMA nodes ("
+        + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
+        + ") for memory, ("
+        + StringUtils.join(",", assignedNumaNodeInfo.getCpuNodes())
+        + ") for cpus for " + containerId);
+    return assignedNumaNodeInfo;
+  }
+
+  /**
+   * Release assigned NUMA resources for the container.
+   *
+   * @param containerId the container ID
+   */
+  public synchronized void releaseNumaResource(ContainerId containerId) {
+    LOG.info("Releasing the assigned NUMA resources for " + containerId);
+    for (NumaNodeResource numaNode : numaNodesList) {
+      numaNode.releaseResources(containerId);
+    }
+  }
+
+  /**
+   * Recovers assigned numa resources.
+   *
+   * @param containerId the container ID to recover resources
+   */
+  public synchronized void recoverNumaResource(ContainerId containerId) {
+    Container container = context.getContainers().get(containerId);
+    ResourceMappings resourceMappings = container.getResourceMappings();
+    List<Serializable> assignedResources = resourceMappings
+        .getAssignedResources(NUMA_RESOURCE_TYPE);
+    if (assignedResources.size() == 1) {
+      NumaResourceAllocation numaResourceAllocation =
+          (NumaResourceAllocation) assignedResources.get(0);
+      for (Entry<String, Long> nodeAndMemory : numaResourceAllocation
+          .getNodeVsMemory().entrySet()) {
+        numaNodeIdVsResource.get(nodeAndMemory.getKey())
+            .recoverMemory(containerId, nodeAndMemory.getValue());
+      }
+      for (Entry<String, Integer> nodeAndCpus : numaResourceAllocation
+          .getNodeVsCpus().entrySet()) {
+        numaNodeIdVsResource.get(nodeAndCpus.getKey()).recoverCpus(containerId,
+            nodeAndCpus.getValue());
+      }
+    } else {
+      LOG.error("Unexpected number:" + assignedResources.size()
+          + " of assigned numa resources for " + containerId
+          + " while recovering.");
+    }
+  }
+
+  @VisibleForTesting
+  Collection<NumaNodeResource> getNumaNodesList() {
+    return numaNodesList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java
new file mode 100644
index 0000000..128daca
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation.OperationType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+/**
+ * ResourceHandler implementation for allocating NUMA Resources to each
+ * container.
+ */
+public class NumaResourceHandlerImpl implements ResourceHandler {
+
+  private static final Log LOG = LogFactory
+      .getLog(NumaResourceHandlerImpl.class);
+  private NumaResourceAllocator numaResourceAllocator;
+  private String numaCtlCmd;
+
+  public NumaResourceHandlerImpl(Configuration conf, Context nmContext) {
+    LOG.info("NUMA resources allocation is enabled, initializing NUMA 
resources"
+        + " allocator.");
+    numaResourceAllocator = new NumaResourceAllocator(nmContext);
+    numaCtlCmd = conf.get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
+        YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
+  }
+
+  @Override
+  public List<PrivilegedOperation> bootstrap(Configuration configuration)
+      throws ResourceHandlerException {
+    try {
+      numaResourceAllocator.init(configuration);
+    } catch (YarnException e) {
+      throw new ResourceHandlerException(e);
+    }
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException {
+    List<PrivilegedOperation> ret = null;
+    NumaResourceAllocation numaAllocation = numaResourceAllocator
+        .allocateNumaNodes(container);
+    if (numaAllocation != null) {
+      ret = new ArrayList<>();
+      ArrayList<String> args = new ArrayList<>();
+      args.add(numaCtlCmd);
+      args.add(
+          "--interleave=" + String.join(",", numaAllocation.getMemNodes()));
+      args.add(
+          "--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes()));
+      ret.add(new PrivilegedOperation(OperationType.ADD_NUMA_PARAMS, args));
+    }
+    return ret;
+  }
+
+  @Override
+  public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException {
+    try {
+      numaResourceAllocator.recoverNumaResource(containerId);
+    } catch (Throwable e) {
+      throw new ResourceHandlerException(
+          "Failed to recover numa resource for " + containerId, e);
+    }
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> postComplete(ContainerId containerId)
+      throws ResourceHandlerException {
+    numaResourceAllocator.releaseNumaResource(containerId);
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java
new file mode 100644
index 0000000..1540adf
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.
+ * resources.numa contains classes related to NM local scheduler allocators.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java
new file mode 100644
index 0000000..e1ba19c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+/**
+ * Test class for NumaResourceAllocator.
+ */
+public class TestNumaResourceAllocator {
+
+  private Configuration conf;
+  private NumaResourceAllocator numaResourceAllocator;
+
+  @Before
+  public void setUp() throws IOException, YarnException {
+    conf = new YarnConfiguration();
+    Context mockContext = mock(Context.class);
+    @SuppressWarnings("unchecked")
+    ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
+        ConcurrentHashMap.class);
+    Container mockContainer = mock(Container.class);
+    when(mockContainer.getResourceMappings())
+        .thenReturn(new ResourceMappings());
+    when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+    when(mockContext.getContainers()).thenReturn(mockContainers);
+    NMStateStoreService mock = mock(NMStateStoreService.class);
+    when(mockContext.getNMStateStore()).thenReturn(mock);
+    numaResourceAllocator = new NumaResourceAllocator(mockContext);
+    setNumaTopologyConfigs();
+    numaResourceAllocator.init(conf);
+  }
+
+  @Test
+  public void testReadNumaTopologyFromConfigurations() throws Exception {
+    Collection<NumaNodeResource> nodesList = numaResourceAllocator
+        .getNumaNodesList();
+    Collection<NumaNodeResource> expectedNodesList = 
getExpectedNumaNodesList();
+    Assert.assertEquals(expectedNodesList, nodesList);
+  }
+
+  @Test
+  public void testReadNumaTopologyFromCmdOutput() throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, true);
+    String cmdOutput = "available: 2 nodes (0-1)\n\t"
+        + "node 0 cpus: 0 2 4 6\n\t"
+        + "node 0 size: 73717 MB\n\t"
+        + "node 0 free: 17272 MB\n\t"
+        + "node 1 cpus: 1 3 5 7\n\t"
+        + "node 1 size: 73727 MB\n\t"
+        + "node 1 free: 10699 MB\n\t"
+        + "node distances:\n\t"
+        + "node 0 1\n\t"
+        + "0: 10 20\n\t"
+        + "1: 20 10";
+    numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) {
+      @Override
+      String executeNGetCmdOutput(Configuration config)
+          throws YarnRuntimeException {
+        return cmdOutput;
+      }
+    };
+    numaResourceAllocator.init(conf);
+    Collection<NumaNodeResource> nodesList = numaResourceAllocator
+        .getNumaNodesList();
+    Collection<NumaNodeResource> expectedNodesList = 
getExpectedNumaNodesList();
+    Assert.assertEquals(expectedNodesList, nodesList);
+  }
+
+  @Test
+  public void testAllocateNumaNode() throws Exception {
+    NumaResourceAllocation nodeInfo = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(2048, 2)));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
+  }
+
+  @Test
+  public void testAllocateNumaNodeWithRoundRobinFashionAssignment()
+      throws Exception {
+    NumaResourceAllocation nodeInfo1 = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(2048, 2)));
+    Assert.assertEquals("0", String.join(",", nodeInfo1.getMemNodes()));
+    Assert.assertEquals("0", String.join(",", nodeInfo1.getCpuNodes()));
+
+    NumaResourceAllocation nodeInfo2 = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000002"),
+            Resource.newInstance(2048, 2)));
+    Assert.assertEquals("1", String.join(",", nodeInfo2.getMemNodes()));
+    Assert.assertEquals("1", String.join(",", nodeInfo2.getCpuNodes()));
+
+    NumaResourceAllocation nodeInfo3 = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000003"),
+            Resource.newInstance(2048, 2)));
+    Assert.assertEquals("0", String.join(",", nodeInfo3.getMemNodes()));
+    Assert.assertEquals("0", String.join(",", nodeInfo3.getCpuNodes()));
+
+    NumaResourceAllocation nodeInfo4 = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000003"),
+            Resource.newInstance(2048, 2)));
+    Assert.assertEquals("1", String.join(",", nodeInfo4.getMemNodes()));
+    Assert.assertEquals("1", String.join(",", nodeInfo4.getCpuNodes()));
+  }
+
+  @Test
+  public void testAllocateNumaNodeWithMultipleNodesForMemory()
+      throws Exception {
+    NumaResourceAllocation nodeInfo = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(102400, 2)));
+    Assert.assertEquals("0,1", String.join(",", nodeInfo.getMemNodes()));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
+  }
+
+  @Test
+  public void testAllocateNumaNodeWithMultipleNodesForCpus() throws Exception {
+    NumaResourceAllocation nodeInfo = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(2048, 6)));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+    Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes()));
+  }
+
+  @Test
+  public void testAllocateNumaNodeWhenNoNumaMemResourcesAvailable()
+      throws Exception {
+    NumaResourceAllocation nodeInfo = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(2048000, 6)));
+    Assert.assertNull("Should not assign numa nodes when there"
+        + " are no sufficient memory resources available.", nodeInfo);
+  }
+
+  @Test
+  public void testAllocateNumaNodeWhenNoNumaCpuResourcesAvailable()
+      throws Exception {
+    NumaResourceAllocation nodeInfo = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(2048, 600)));
+    Assert.assertNull("Should not assign numa nodes when there"
+        + " are no sufficient cpu resources available.", nodeInfo);
+  }
+
+  @Test
+  public void testReleaseNumaResourcess() throws Exception {
+    NumaResourceAllocation nodeInfo = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000001"),
+            Resource.newInstance(2048, 8)));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+    Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes()));
+
+    // Request the resource when all cpu nodes occupied
+    nodeInfo = numaResourceAllocator.allocateNumaNodes(getContainer(
+        ContainerId.fromString("container_1481156246874_0001_01_000002"),
+        Resource.newInstance(2048, 4)));
+    Assert.assertNull("Should not assign numa nodes when there"
+        + " are no sufficient cpu resources available.", nodeInfo);
+
+    // Release the resources
+    numaResourceAllocator.releaseNumaResource(
+        ContainerId.fromString("container_1481156246874_0001_01_000001"));
+    // Request the resources
+    nodeInfo = numaResourceAllocator.allocateNumaNodes(getContainer(
+        ContainerId.fromString("container_1481156246874_0001_01_000003"),
+        Resource.newInstance(1024, 2)));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
+    Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
+  }
+
+  @Test
+  public void testRecoverNumaResource() throws Exception {
+    @SuppressWarnings("unchecked")
+    ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
+        ConcurrentHashMap.class);
+    Context mockContext = mock(Context.class);
+    Container mockContainer = mock(Container.class);
+    ResourceMappings value = new ResourceMappings();
+    AssignedResources assignedResources = new AssignedResources();
+    assignedResources.updateAssignedResources(
+        Arrays.asList(new NumaResourceAllocation("0", 70000, "0", 4)));
+    value.addAssignedResources("numa", assignedResources);
+    when(mockContainer.getResourceMappings()).thenReturn(value);
+    when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+    when(mockContext.getContainers()).thenReturn(mockContainers);
+    NMStateStoreService mock = mock(NMStateStoreService.class);
+    when(mockContext.getNMStateStore()).thenReturn(mock);
+    numaResourceAllocator = new NumaResourceAllocator(mockContext);
+    numaResourceAllocator.init(conf);
+    // Recover the resources
+    numaResourceAllocator.recoverNumaResource(
+        ContainerId.fromString("container_1481156246874_0001_01_000001"));
+
+    // Request resources based on the availability
+    NumaResourceAllocation numaNode = numaResourceAllocator
+        .allocateNumaNodes(getContainer(
+            ContainerId.fromString("container_1481156246874_0001_01_000005"),
+            Resource.newInstance(2048, 1)));
+    assertEquals("1", String.join(",", numaNode.getMemNodes()));
+    assertEquals("1", String.join(",", numaNode.getCpuNodes()));
+
+    // Request resources more than the available
+    numaNode = numaResourceAllocator.allocateNumaNodes(getContainer(
+        ContainerId.fromString("container_1481156246874_0001_01_000006"),
+        Resource.newInstance(2048, 4)));
+    assertNull(numaNode);
+  }
+
+  private void setNumaTopologyConfigs() {
+    conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1");
+    conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717");
+    conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4");
+    conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727");
+    conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4");
+  }
+
+  private Collection<NumaNodeResource> getExpectedNumaNodesList() {
+    Collection<NumaNodeResource> expectedNodesList = new ArrayList<>(2);
+    expectedNodesList.add(new NumaNodeResource("0", 73717, 4));
+    expectedNodesList.add(new NumaNodeResource("1", 73727, 4));
+    return expectedNodesList;
+  }
+
+  private Container getContainer(ContainerId containerId, Resource resource) {
+    Container mockContainer = mock(Container.class);
+    when(mockContainer.getContainerId()).thenReturn(containerId);
+    when(mockContainer.getResource()).thenReturn(resource);
+    return mockContainer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a82d4a2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java
new file mode 100644
index 0000000..341cbf0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceHandlerImpl.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+/**
+ * Test class for NumaResourceHandlerImpl.
+ *
+ */
+public class TestNumaResourceHandlerImpl {
+
+  private YarnConfiguration conf;
+  private NumaResourceHandlerImpl numaResourceHandler;
+  private Container mockContainer;
+
+  @Before
+  public void setUp() throws IOException, ResourceHandlerException {
+    conf = new YarnConfiguration();
+    setNumaTopologyConfigs();
+    Context mockContext = createAndGetMockContext();
+    NMStateStoreService mock = mock(NMStateStoreService.class);
+    when(mockContext.getNMStateStore()).thenReturn(mock);
+    numaResourceHandler = new NumaResourceHandlerImpl(conf, mockContext);
+    numaResourceHandler.bootstrap(conf);
+    mockContainer = mock(Container.class);
+  }
+
+  @Test
+  public void testAllocateNumaMemoryResource() throws ResourceHandlerException 
{
+    // allocates node 0 for memory and cpu
+    testAllocateNumaResource("container_1481156246874_0001_01_000001",
+        Resource.newInstance(2048, 2), "0", "0");
+    // allocates node 1 for memory and cpu since allocator uses round
+    // robin assignment
+    testAllocateNumaResource("container_1481156246874_0001_01_000002",
+        Resource.newInstance(60000, 2), "1", "1");
+    // allocates node 0,1 for memory since there is no sufficient memory in any
+    // one node
+    testAllocateNumaResource("container_1481156246874_0001_01_000003",
+        Resource.newInstance(80000, 2), "0,1", "0");
+    // returns null since there are no sufficient resources available for the
+    // request
+    when(mockContainer.getContainerId()).thenReturn(
+        ContainerId.fromString("container_1481156246874_0001_01_000004"));
+    when(mockContainer.getResource())
+        .thenReturn(Resource.newInstance(80000, 2));
+    assertNull(numaResourceHandler.preStart(mockContainer));
+    // allocates node 1 for memory and cpu
+    testAllocateNumaResource("container_1481156246874_0001_01_000005",
+        Resource.newInstance(1024, 2), "1", "1");
+  }
+
+  @Test
+  public void testAllocateNumaCpusResource() throws ResourceHandlerException {
+    // allocates node 0 for memory and cpu
+    testAllocateNumaResource("container_1481156246874_0001_01_000001",
+        Resource.newInstance(2048, 2), "0", "0");
+    // allocates node 1 for memory and cpu since allocator uses round
+    // robin assignment
+    testAllocateNumaResource("container_1481156246874_0001_01_000002",
+        Resource.newInstance(2048, 2), "1", "1");
+    // allocates node 0,1 for cpus since there is are no sufficient cpus
+    // available in any one node
+    testAllocateNumaResource("container_1481156246874_0001_01_000003",
+        Resource.newInstance(2048, 3), "0", "0,1");
+    // returns null since there are no sufficient resources available for the
+    // request
+    when(mockContainer.getContainerId()).thenReturn(
+        ContainerId.fromString("container_1481156246874_0001_01_000004"));
+    when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 
2));
+    assertNull(numaResourceHandler.preStart(mockContainer));
+    // allocates node 1 for memory and cpu
+    testAllocateNumaResource("container_1481156246874_0001_01_000005",
+        Resource.newInstance(2048, 1), "1", "1");
+  }
+
+  @Test
+  public void testReacquireContainer() throws Exception {
+    @SuppressWarnings("unchecked")
+    ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
+        ConcurrentHashMap.class);
+    Context mockContext = mock(Context.class);
+    NMStateStoreService mock = mock(NMStateStoreService.class);
+    when(mockContext.getNMStateStore()).thenReturn(mock);
+    ResourceMappings resourceMappings = new ResourceMappings();
+    AssignedResources assignedRscs = new AssignedResources();
+    NumaResourceAllocation numaResourceAllocation = new NumaResourceAllocation(
+        "0", 70000, "0", 4);
+    
assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation));
+    resourceMappings.addAssignedResources("numa", assignedRscs);
+    when(mockContainer.getResourceMappings()).thenReturn(resourceMappings);
+    when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+    when(mockContext.getContainers()).thenReturn(mockContainers);
+    numaResourceHandler = new NumaResourceHandlerImpl(conf, mockContext);
+    numaResourceHandler.bootstrap(conf);
+    // recovered numa resources should be added to the used resources and
+    // remaining will be available for further allocation.
+    numaResourceHandler.reacquireContainer(
+        ContainerId.fromString("container_1481156246874_0001_01_000001"));
+
+    testAllocateNumaResource("container_1481156246874_0001_01_000005",
+        Resource.newInstance(2048, 1), "1", "1");
+    when(mockContainer.getContainerId()).thenReturn(
+        ContainerId.fromString("container_1481156246874_0001_01_000005"));
+    when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 
4));
+    List<PrivilegedOperation> preStart = numaResourceHandler
+        .preStart(mockContainer);
+    assertNull(preStart);
+  }
+
+  private void setNumaTopologyConfigs() {
+    conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1");
+    conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717");
+    conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4");
+    conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727");
+    conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4");
+  }
+
+  private Context createAndGetMockContext() {
+    Context mockContext = mock(Context.class);
+    @SuppressWarnings("unchecked")
+    ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
+        ConcurrentHashMap.class);
+    mockContainer = mock(Container.class);
+    when(mockContainer.getResourceMappings())
+        .thenReturn(new ResourceMappings());
+    when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
+    when(mockContext.getContainers()).thenReturn(mockContainers);
+    return mockContext;
+  }
+
+  private void testAllocateNumaResource(String containerId, Resource resource,
+      String memNodes, String cpuNodes) throws ResourceHandlerException {
+    when(mockContainer.getContainerId())
+        .thenReturn(ContainerId.fromString(containerId));
+    when(mockContainer.getResource()).thenReturn(resource);
+    List<PrivilegedOperation> preStart = numaResourceHandler
+        .preStart(mockContainer);
+    List<String> arguments = preStart.get(0).getArguments();
+    assertEquals(arguments, Arrays.asList("/usr/bin/numactl",
+        "--interleave=" + memNodes, "--cpunodebind=" + cpuNodes));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to