Repository: hadoop
Updated Branches:
  refs/heads/YARN-3409 b80cb05d0 -> e0e507eb7


YARN-7871. Node attributes reporting from NM to RM. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0e507eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0e507eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0e507eb

Branch: refs/heads/YARN-3409
Commit: e0e507eb785692df541d813edf399c90e90a97fc
Parents: b80cb05
Author: Naganarasimha <naganarasimha...@apache.org>
Authored: Mon Mar 12 08:05:53 2018 +0800
Committer: Naganarasimha <naganarasimha...@apache.org>
Committed: Mon Mar 12 08:05:53 2018 +0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +-
 .../yarn/nodelabels/NodeAttributesManager.java  |  17 +-
 .../hadoop/yarn/nodelabels/NodeLabelUtil.java   |  19 ++
 .../src/main/resources/yarn-default.xml         |  24 +++
 .../yarn/server/nodemanager/NodeManager.java    |  70 +++++--
 .../server/nodemanager/NodeStatusUpdater.java   |  14 ++
 .../nodemanager/NodeStatusUpdaterImpl.java      |  70 ++++++-
 .../ConfigurationNodeAttributesProvider.java    |  90 +++++++++
 .../server/nodemanager/TestNodeManager.java     |   2 +-
 .../TestNodeStatusUpdaterForLabels.java         |  10 +-
 ...TestConfigurationNodeAttributesProvider.java | 185 +++++++++++++++++++
 .../resourcemanager/ResourceTrackerService.java |  30 +++
 .../nodelabels/NodeAttributesManagerImpl.java   |  52 ++++--
 .../TestResourceTrackerService.java             |  78 ++++++++
 .../nodelabels/TestNodeAttributesManager.java   |  99 ++++++++++
 15 files changed, 718 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 54cbf8b..83da58d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3343,9 +3343,12 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
       NM_NODE_LABELS_PREFIX + "provider";
 
+  public static final String NM_NODE_ATTRIBUTES_PROVIDER_CONFIG =
+      NM_NODE_ATTRIBUTES_PREFIX + "provider";
+
   // whitelist names for the yarn.nodemanager.node-labels.provider
-  public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
-  public static final String SCRIPT_NODE_LABELS_PROVIDER = "script";
+  public static final String CONFIG_NODE_DESCRIPTOR_PROVIDER = "config";
+  public static final String SCRIPT_NODE_DESCRIPTOR_PROVIDER = "script";
 
   private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
       NM_NODE_LABELS_PREFIX + "provider.";
@@ -3377,6 +3380,9 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION =
       NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition";
 
+  public static final String NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES =
+      NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "configured-node-attributes";
+
   private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
       + "node-labels.";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
index effda9b..ffa33cf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
@@ -35,15 +35,18 @@ public abstract class NodeAttributesManager extends 
AbstractService {
 
   /**
    * To completely replace the mappings for a given node with the new Set of
-   * Attributes. If the mapping contains an attribute whose type does not match
-   * a previously existing Attribute under the same prefix (name space) then
-   * exception is thrown. Key would be name of the node and value would be set
-   * of Attributes to be mapped.
+   * Attributes which are under a given prefix. If the mapping contains an
+   * attribute whose type does not match a previously existing Attribute
+   * under the same prefix (name space) then exception is thrown.
+   * Key would be name of the node and value would be set of Attributes to
+   * be mapped. If the prefix is null, then all node attributes will be
+   * replaced regardless of what prefix they have.
    *
-   * @param nodeAttributeMapping
-   * @throws IOException
+   * @param prefix node attribute prefix
+   * @param nodeAttributeMapping host name to a set of node attributes mapping
+   * @throws IOException if failed to replace attributes
    */
-  public abstract void replaceNodeAttributes(
+  public abstract void replaceNodeAttributes(String prefix,
       Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
index fdfd0ce..93a27a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import java.io.IOException;
 import java.util.Set;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Utility class for all NodeLabel and NodeAttribute operations.
@@ -125,4 +126,22 @@ public final class NodeLabelUtil {
       }
     }
   }
+
+  /**
+   * Filter a set of node attributes by a given prefix. Returns a filtered
+   * set of node attributes whose prefix equals the given prefix.
+   * If the prefix is null or empty, then the original set is returned.
+   * @param attributeSet node attribute set
+   * @param prefix node attribute prefix
+   * @return a filtered set of node attributes
+   */
+  public static Set<NodeAttribute> filterAttributesByPrefix(
+      Set<NodeAttribute> attributeSet, String prefix) {
+    if (Strings.isNullOrEmpty(prefix)) {
+      return attributeSet;
+    }
+    return attributeSet.stream().filter(
+        nodeAttribute -> prefix.equals(nodeAttribute.getAttributePrefix()))
+        .collect(Collectors.toSet());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 684674b..8c514ff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2775,6 +2775,20 @@
   <!-- Distributed Node Attributes Configuration -->
   <property>
     <description>
+      This property determines which provider will be plugged by the
+      node manager to collect node-attributes. Administrators can
+      configure "config", "script" or the class name of the provider.
+      Configured class needs to extend
+      
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider.
+      If "config" is configured, then "ConfigurationNodeLabelsProvider" and if
+      "script" is configured, then "ScriptBasedNodeAttributesProvider"
+      will be used.
+    </description>
+    <name>yarn.nodemanager.node-attributes.provider</name>
+  </property>
+
+  <property>
+    <description>
       The node attribute script NM runs to collect node attributes.
       Script output Line starting with "NODE_ATTRIBUTE:" will be
       considered as a record of node attribute, attribute name, type
@@ -2812,6 +2826,16 @@
 
   <property>
     <description>
+      When "yarn.nodemanager.node-attributes.provider" is configured with
+      "config" then ConfigurationNodeAttributesProvider fetches node attributes
+      from this parameter.
+    </description>
+    
<name>yarn.nodemanager.node-attributes.provider.configured-node-attributes</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
     Timeout in seconds for YARN node graceful decommission.
     This is the maximal time to wait for running containers and applications 
to complete
     before transition a DECOMMISSIONING node into DECOMMISSIONED.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 5cacd20..30dc125 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -63,6 +63,9 @@ import 
org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider;
+import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeAttributesProvider;
+import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeAttributesProvider;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -120,6 +123,7 @@ public class NodeManager extends CompositeService
   private ApplicationACLsManager aclsManager;
   private NodeHealthCheckerService nodeHealthChecker;
   private NodeLabelsProvider nodeLabelsProvider;
+  private NodeAttributesProvider nodeAttributesProvider;
   private LocalDirsHandlerService dirsHandler;
   private Context context;
   private AsyncDispatcher dispatcher;
@@ -157,14 +161,45 @@ public class NodeManager extends CompositeService
   protected NodeStatusUpdater createNodeStatusUpdater(Context context,
       Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
     return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
-        metrics, nodeLabelsProvider);
+        metrics);
   }
 
-  protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-      Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
-      NodeLabelsProvider nodeLabelsProvider) {
-    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
-        metrics, nodeLabelsProvider);
+  protected NodeAttributesProvider createNodeAttributesProvider(
+      Configuration conf) throws IOException {
+    NodeAttributesProvider attributesProvider = null;
+    String providerString =
+        conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null);
+    if (providerString == null || providerString.trim().length() == 0) {
+      return attributesProvider;
+    }
+    switch (providerString.trim().toLowerCase()) {
+    case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
+      attributesProvider = new ConfigurationNodeAttributesProvider();
+      break;
+    case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
+      attributesProvider = new ScriptBasedNodeAttributesProvider();
+      break;
+    default:
+      try {
+        Class<? extends NodeAttributesProvider> labelsProviderClass =
+            conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG,
+                null, NodeAttributesProvider.class);
+        attributesProvider = labelsProviderClass.newInstance();
+      } catch (InstantiationException | IllegalAccessException
+          | RuntimeException e) {
+        LOG.error("Failed to create NodeAttributesProvider"
+                + " based on Configuration", e);
+        throw new IOException(
+            "Failed to create NodeAttributesProvider : "
+                + e.getMessage(), e);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Distributed Node Attributes is enabled"
+          + " with provider class as : "
+          + attributesProvider.getClass().toString());
+    }
+    return attributesProvider;
   }
 
   protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@@ -177,10 +212,10 @@ public class NodeManager extends CompositeService
       return provider;
     }
     switch (providerString.trim().toLowerCase()) {
-    case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
+    case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
       provider = new ConfigurationNodeLabelsProvider();
       break;
-    case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER:
+    case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
       provider = new ScriptBasedNodeLabelsProvider();
       break;
     default:
@@ -402,16 +437,19 @@ public class NodeManager extends CompositeService
     ((NMContext)context).setContainerExecutor(exec);
     ((NMContext)context).setDeletionService(del);
 
-    nodeLabelsProvider = createNodeLabelsProvider(conf);
+    nodeStatusUpdater =
+        createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
 
-    if (null == nodeLabelsProvider) {
-      nodeStatusUpdater =
-          createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
-    } else {
+    nodeLabelsProvider = createNodeLabelsProvider(conf);
+    if (nodeLabelsProvider != null) {
       addIfService(nodeLabelsProvider);
-      nodeStatusUpdater =
-          createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
-              nodeLabelsProvider);
+      nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
+    }
+
+    nodeAttributesProvider = createNodeAttributesProvider(conf);
+    if (nodeAttributesProvider != null) {
+      addIfService(nodeAttributesProvider);
+      nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
     }
 
     nodeResourceMonitor = createNodeResourceMonitor();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
index 08892d2..142cbbc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 
 public interface NodeStatusUpdater extends Service {
 
@@ -59,4 +61,16 @@ public interface NodeStatusUpdater extends Service {
    * @param ex exception that makes the node unhealthy
    */
   void reportException(Exception ex);
+
+  /**
+   * Sets a node attributes provider to node manager.
+   * @param provider
+   */
+  void setNodeAttributesProvider(NodeAttributesProvider provider);
+
+  /**
+   * Sets a node labels provider to the node manager.
+   * @param provider
+   */
+  void setNodeLabelsProvider(NodeLabelsProvider provider);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index df3fbac..70a253f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -85,6 +86,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import 
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
@@ -152,21 +154,16 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
   Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
 
   private NMNodeLabelsHandler nodeLabelsHandler;
-  private final NodeLabelsProvider nodeLabelsProvider;
+  private NMNodeAttributesHandler nodeAttributesHandler;
+  private NodeLabelsProvider nodeLabelsProvider;
+  private NodeAttributesProvider nodeAttributesProvider;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
-    this(context, dispatcher, healthChecker, metrics, null);
-  }
-
-  public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
-      NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
-      NodeLabelsProvider nodeLabelsProvider) {
     super(NodeStatusUpdaterImpl.class.getName());
     this.healthChecker = healthChecker;
     this.context = context;
     this.dispatcher = dispatcher;
-    this.nodeLabelsProvider = nodeLabelsProvider;
     this.metrics = metrics;
     this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
     this.pendingCompletedContainers =
@@ -176,6 +173,16 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
   }
 
   @Override
+  public void setNodeAttributesProvider(NodeAttributesProvider provider) {
+    this.nodeAttributesProvider = provider;
+  }
+
+  @Override
+  public void setNodeLabelsProvider(NodeLabelsProvider provider) {
+    this.nodeLabelsProvider = provider;
+  }
+
+  @Override
   protected void serviceInit(Configuration conf) throws Exception {
     this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf);
     long memoryMb = totalResource.getMemorySize();
@@ -214,7 +221,11 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
         YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
         YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
 
-    nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
+    nodeLabelsHandler =
+        createNMNodeLabelsHandler(nodeLabelsProvider);
+    nodeAttributesHandler =
+        createNMNodeAttributesHandler(nodeAttributesProvider);
+
     // Default duration to track stopped containers on nodemanager is 10Min.
     // This should not be assigned very large value as it will remember all the
     // containers stopped during that time.
@@ -842,6 +853,43 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     }
   }
 
+  /**
+   * Returns a handler based on the configured node attributes provider.
+   * returns null if no provider is configured.
+   * @param provider
+   * @return attributes handler
+   */
+  private NMNodeAttributesHandler createNMNodeAttributesHandler(
+      NodeAttributesProvider provider) {
+    return provider == null ? null :
+        new NMDistributedNodeAttributesHandler(nodeAttributesProvider);
+  }
+
+  private interface NMNodeAttributesHandler {
+
+    /**
+     * @return the node attributes of this node manager.
+     */
+    Set<NodeAttribute> getNodeAttributesForHeartbeat();
+  }
+
+  private static class NMDistributedNodeAttributesHandler
+      implements NMNodeAttributesHandler {
+
+    private final NodeAttributesProvider attributesProvider;
+
+    protected NMDistributedNodeAttributesHandler(
+        NodeAttributesProvider provider) {
+      this.attributesProvider = provider;
+    }
+
+    @Override
+    public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
+      return attributesProvider.getDescriptors();
+    }
+  }
+
+
   private static interface NMNodeLabelsHandler {
     /**
      * validates nodeLabels From Provider and returns it to the caller. Also
@@ -1057,6 +1105,9 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
           NodeHeartbeatResponse response = null;
           Set<NodeLabel> nodeLabelsForHeartbeat =
               nodeLabelsHandler.getNodeLabelsForHeartbeat();
+          Set<NodeAttribute> nodeAttributesForHeartbeat =
+              nodeAttributesHandler == null ? null :
+                  nodeAttributesHandler.getNodeAttributesForHeartbeat();
           NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
           NodeHeartbeatRequest request =
               NodeHeartbeatRequest.newInstance(nodeStatus,
@@ -1065,6 +1116,7 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
                   NodeStatusUpdaterImpl.this.context
                       .getNMTokenSecretManager().getCurrentKey(),
                   nodeLabelsForHeartbeat,
+                  nodeAttributesForHeartbeat,
                   NodeStatusUpdaterImpl.this.context
                       .getRegisteringCollectors());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
new file mode 100644
index 0000000..74341eb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.TimerTask;
+import java.util.Set;
+
+/**
+ * Configuration based node attributes provider.
+ */
+public class ConfigurationNodeAttributesProvider
+    extends NodeAttributesProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class);
+
+  public ConfigurationNodeAttributesProvider() {
+    super("Configuration Based Node Attributes Provider");
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    long taskInterval = conf.getLong(YarnConfiguration
+            .NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS,
+        YarnConfiguration
+            .DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS);
+    this.setIntervalTime(taskInterval);
+    super.serviceInit(conf);
+  }
+
+  private void updateNodeAttributesFromConfig(Configuration conf)
+      throws IOException {
+    String configuredNodeAttributes = conf.get(
+        YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, null);
+    setDescriptors(parseAttributes(configuredNodeAttributes));
+  }
+
+  // TODO parse attributes from configuration
+  @VisibleForTesting
+  public Set<NodeAttribute> parseAttributes(String config)
+      throws IOException {
+    return new HashSet<>();
+  }
+
+  private class ConfigurationMonitorTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      try {
+        updateNodeAttributesFromConfig(new YarnConfiguration());
+      } catch (Exception e) {
+        LOG.error("Failed to update node attributes from "
+            + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, e);
+      }
+    }
+  }
+
+  @Override
+  protected void cleanUp() throws Exception {
+    // Nothing to cleanup
+  }
+
+  @Override
+  public TimerTask createTimerTask() {
+    return new ConfigurationMonitorTimerTask();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
index b31215b..b2c2f6e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
@@ -160,7 +160,7 @@ public class TestNodeManager {
 
       // With valid whitelisted configurations
       conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
-          YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
+          YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER);
       labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
       Assert.assertNotNull("LabelsProviderService should be initialized When "
           + "node labels provider class is configured", labelsProviderService);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
index 7ef23cb..3e2d963 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -225,11 +225,10 @@ public class TestNodeStatusUpdaterForLabels extends 
NodeLabelTestBase {
 
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
-          NodeLabelsProvider labelsProvider) {
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
 
         return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
-            metrics, labelsProvider) {
+            metrics) {
           @Override
           protected ResourceTracker getRMClient() {
             return resourceTracker;
@@ -325,11 +324,10 @@ public class TestNodeStatusUpdaterForLabels extends 
NodeLabelTestBase {
 
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
-          NodeLabelsProvider labelsProvider) {
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
 
         return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
-            metrics, labelsProvider) {
+            metrics) {
           @Override
           protected ResourceTracker getRMClient() {
             return resourceTracker;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
new file mode 100644
index 0000000..54cc8f0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test class for node configuration node attributes provider.
+ */
+public class TestConfigurationNodeAttributesProvider {
+
+  private static File testRootDir = new File("target",
+      TestConfigurationNodeAttributesProvider.class.getName() + "-localDir")
+      .getAbsoluteFile();
+
+  private ConfigurationNodeAttributesProvider nodeAttributesProvider;
+
+  @BeforeClass
+  public static void create() {
+    testRootDir.mkdirs();
+  }
+
+  @Before
+  public void setup() {
+    nodeAttributesProvider = new ConfigurationNodeAttributesProvider();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (nodeAttributesProvider != null) {
+      nodeAttributesProvider.close();
+      nodeAttributesProvider.stop();
+    }
+  }
+
+  @AfterClass
+  public static void remove() throws Exception {
+    if (testRootDir.exists()) {
+      FileContext.getLocalFSFileContext()
+          .delete(new Path(testRootDir.getAbsolutePath()), true);
+    }
+  }
+
+  @Test(timeout=30000L)
+  public void testNodeAttributesFetchInterval()
+      throws IOException, InterruptedException {
+    Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
+    expectedAttributes1.add(NodeAttribute
+        .newInstance("test.io", "host",
+            NodeAttributeType.STRING, "host1"));
+
+    Configuration conf = new Configuration();
+    // Set fetch interval to 1s for testing
+    conf.setLong(
+        YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000);
+    ConfigurationNodeAttributesProvider spyProvider =
+        Mockito.spy(nodeAttributesProvider);
+    Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+        .thenReturn(expectedAttributes1);
+
+    spyProvider.init(conf);
+    spyProvider.start();
+
+    // Verify init value is honored.
+    Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors());
+
+    // Configuration provider provides a different set of attributes.
+    Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
+    expectedAttributes2.add(NodeAttribute
+        .newInstance("test.io", "os",
+            NodeAttributeType.STRING, "windows"));
+    Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+        .thenReturn(expectedAttributes2);
+
+    // Since we set fetch interval to 1s, it needs to wait for 1s until
+    // the updated attributes is updated to the provider. So we are expecting
+    // to see some old values for a short window.
+    ArrayList<String> keysMet = new ArrayList<>();
+    int numOfOldValue = 0;
+    int numOfNewValue = 0;
+    // Run 5 times in 500ms interval
+    int times=5;
+    while(times>0) {
+      Set<NodeAttribute> current = spyProvider.getDescriptors();
+      Assert.assertEquals(1, current.size());
+      String attributeName = current.iterator().next().getAttributeName();
+      if ("host".equals(attributeName)){
+        numOfOldValue++;
+      } else if ("os".equals(attributeName)) {
+        numOfNewValue++;
+      }
+      Thread.sleep(500);
+      times--;
+    }
+    // We should either see the old value or the new value.
+    Assert.assertEquals(5, numOfNewValue + numOfOldValue);
+    // Both values should be more than 0.
+    Assert.assertTrue(numOfOldValue > 0);
+    Assert.assertTrue(numOfNewValue > 0);
+  }
+
+  @Test
+  public void testDisableFetchNodeAttributes() throws IOException,
+      InterruptedException {
+    Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
+    expectedAttributes1.add(NodeAttribute
+        .newInstance("test.io", "host",
+            NodeAttributeType.STRING, "host1"));
+
+    Configuration conf = new Configuration();
+    // Set fetch interval to -1 to disable refresh.
+    conf.setLong(
+        YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
+    ConfigurationNodeAttributesProvider spyProvider =
+        Mockito.spy(nodeAttributesProvider);
+    Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+        .thenReturn(expectedAttributes1);
+    spyProvider.init(conf);
+    spyProvider.start();
+
+    Assert.assertEquals(expectedAttributes1,
+        spyProvider.getDescriptors());
+
+    // The configuration added another attribute,
+    // as we disabled the fetch interval, this value cannot be
+    // updated to the provider.
+    Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
+    expectedAttributes2.add(NodeAttribute
+        .newInstance("test.io", "os",
+            NodeAttributeType.STRING, "windows"));
+    Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+        .thenReturn(expectedAttributes2);
+
+    // Wait a few seconds until we get the value update, expecting a failure.
+    try {
+      GenericTestUtils.waitFor(() -> {
+        Set<NodeAttribute> attributes = spyProvider.getDescriptors();
+        return "os".equalsIgnoreCase(attributes
+            .iterator().next().getAttributeName());
+      }, 500, 1000);
+    } catch (Exception e) {
+      // Make sure we get the timeout exception.
+      Assert.assertTrue(e instanceof TimeoutException);
+      return;
+    }
+
+    Assert.fail("Expecting a failure in previous check!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index a42d053..487e73c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -611,6 +613,34 @@ public class ResourceTrackerService extends 
AbstractService implements
           this.rmContext.getNodeManagerQueueLimitCalculator()
               .createContainerQueuingLimit());
     }
+
+    // 8. Get node's attributes and update node-to-attributes mapping
+    // in RMNodeAttributeManager.
+    Set<NodeAttribute> nodeAttributes = request.getNodeAttributes();
+    if (nodeAttributes != null && !nodeAttributes.isEmpty()) {
+      nodeAttributes.forEach(nodeAttribute ->
+          LOG.debug(nodeId.toString() + " ATTRIBUTE : "
+              + nodeAttribute.toString()));
+
+      // Validate attributes
+      if (!nodeAttributes.stream().allMatch(
+          nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
+              .equals(nodeAttribute.getAttributePrefix()))) {
+        // All attributes must be in same prefix: nm.yarn.io.
+        // Since we have the checks in NM to make sure attributes reported
+        // in HB are with correct prefix, so it should not reach here.
+        LOG.warn("Reject invalid node attributes from host: "
+            + nodeId.toString() + ", attributes in HB must have prefix "
+            + NodeAttribute.PREFIX_DISTRIBUTED);
+      } else {
+        // Replace all distributed node attributes associated with this host
+        // with the new reported attributes in node attribute manager.
+        this.rmContext.getNodeAttributesManager()
+            .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+                ImmutableMap.of(nodeId.getHost(), nodeAttributes));
+      }
+    }
+
     return nodeHeartBeatResponse;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index a902ac6..04d74a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import com.google.common.base.Strings;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -126,7 +127,8 @@ public class NodeAttributesManagerImpl extends 
NodeAttributesManager {
   private void internalUpdateAttributesOnNodes(
       Map<String, Map<NodeAttribute, AttributeValue>> nodeAttributeMapping,
       AttributeMappingOperationType op,
-      Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded) {
+      Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded,
+      String attributePrefix) {
     try {
       writeLock.lock();
 
@@ -156,8 +158,9 @@ public class NodeAttributesManagerImpl extends 
NodeAttributesManager {
           break;
         case REPLACE:
           clusterAttributes.putAll(newAttributesToBeAdded);
-          replaceNodeToAttribute(nodeHost, node.getAttributes(), attributes);
-          node.replaceAttributes(attributes);
+          replaceNodeToAttribute(nodeHost, attributePrefix,
+              node.getAttributes(), attributes);
+          node.replaceAttributes(attributes, attributePrefix);
           break;
         default:
           break;
@@ -199,15 +202,23 @@ public class NodeAttributesManagerImpl extends 
NodeAttributesManager {
   private void addNodeToAttribute(String nodeHost,
       Map<NodeAttribute, AttributeValue> attributeMappings) {
     for (NodeAttribute attribute : attributeMappings.keySet()) {
-      clusterAttributes.get(attribute).addNode(nodeHost);
+      RMNodeAttribute rmNodeAttribute = clusterAttributes.get(attribute);
+      if (rmNodeAttribute != null) {
+        rmNodeAttribute.addNode(nodeHost);
+      } else {
+        clusterAttributes.put(attribute, new RMNodeAttribute(attribute));
+      }
     }
   }
 
-  private void replaceNodeToAttribute(String nodeHost,
+  private void replaceNodeToAttribute(String nodeHost, String prefix,
       Map<NodeAttribute, AttributeValue> oldAttributeMappings,
       Map<NodeAttribute, AttributeValue> newAttributeMappings) {
     if (oldAttributeMappings != null) {
-      removeNodeFromAttributes(nodeHost, oldAttributeMappings.keySet());
+      Set<NodeAttribute> toRemoveAttributes =
+          NodeLabelUtil.filterAttributesByPrefix(
+              oldAttributeMappings.keySet(), prefix);
+      removeNodeFromAttributes(nodeHost, toRemoveAttributes);
     }
     addNodeToAttribute(nodeHost, newAttributeMappings);
   }
@@ -432,8 +443,19 @@ public class NodeAttributesManagerImpl extends 
NodeAttributesManager {
     }
 
     public void replaceAttributes(
-        Map<NodeAttribute, AttributeValue> attributesMapping) {
-      this.attributes.clear();
+        Map<NodeAttribute, AttributeValue> attributesMapping, String prefix) {
+      if (Strings.isNullOrEmpty(prefix)) {
+        this.attributes.clear();
+      } else {
+        Iterator<Entry<NodeAttribute, AttributeValue>> it =
+            this.attributes.entrySet().iterator();
+        while (it.hasNext()) {
+          Entry<NodeAttribute, AttributeValue> current = it.next();
+          if (prefix.equals(current.getKey().getAttributePrefix())) {
+            it.remove();
+          }
+        }
+      }
       this.attributes.putAll(attributesMapping);
     }
 
@@ -506,9 +528,10 @@ public class NodeAttributesManagerImpl extends 
NodeAttributesManager {
   }
 
   @Override
-  public void replaceNodeAttributes(
+  public void replaceNodeAttributes(String prefix,
       Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException 
{
-    processMapping(nodeAttributeMapping, 
AttributeMappingOperationType.REPLACE);
+    processMapping(nodeAttributeMapping,
+        AttributeMappingOperationType.REPLACE, prefix);
   }
 
   @Override
@@ -526,12 +549,19 @@ public class NodeAttributesManagerImpl extends 
NodeAttributesManager {
   private void processMapping(
       Map<String, Set<NodeAttribute>> nodeAttributeMapping,
       AttributeMappingOperationType mappingType) throws IOException {
+    processMapping(nodeAttributeMapping, mappingType, null);
+  }
+
+  private void processMapping(
+      Map<String, Set<NodeAttribute>> nodeAttributeMapping,
+      AttributeMappingOperationType mappingType, String attributePrefix)
+      throws IOException {
     Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded =
         new HashMap<>();
     Map<String, Map<NodeAttribute, AttributeValue>> validMapping =
         validate(nodeAttributeMapping, newAttributesToBeAdded, false);
 
     internalUpdateAttributesOnNodes(validMapping, mappingType,
-        newAttributesToBeAdded);
+        newAttributesToBeAdded, attributePrefix);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index fc6326e..4fd62b8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -37,6 +37,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -64,12 +65,16 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -819,6 +824,79 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
   }
 
   @Test
+  public void testNodeHeartbeatWithNodeAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    // Register to RM
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    RegisterNodeManagerResponse registerResponse =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    Set<NodeAttribute> nodeAttributes = new HashSet<>();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host2"));
+
+    // Set node attributes in HB.
+    NodeHeartbeatRequest heartbeatReq =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+    int responseId = nodeStatusObject.getResponseId();
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+        .getNMTokenMasterKey());
+    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+        .getContainerTokenMasterKey());
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Ensure RM gets correct node attributes update.
+    NodeAttributesManager attributeManager =
+        rm.getRMContext().getNodeAttributesManager();
+    Map<NodeAttribute, AttributeValue> attrs = attributeManager
+        .getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(1, attrs.size());
+    NodeAttribute na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeName());
+    Assert.assertEquals("host2", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+
+
+    // Send another HB to RM with updated node atrribute
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3"));
+    nodeStatusObject = getNodeStatusObject(nodeId);
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    attrs = attributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(1, attrs.size());
+    na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeName());
+    Assert.assertEquals("host3", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+  }
+
+  @Test
   public void testNodeHeartBeatWithInvalidLabels() throws Exception {
     writeToHostsFile("host2");
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0e507eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
index b639a74..07968d4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.nodelabels.AttributeValue;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
@@ -255,4 +257,101 @@ public class TestNodeAttributesManager {
         .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
     Assert.assertEquals(2, allAttributesPerPrefix.size());
   }
+
+  @Test
+  public void testReplaceNodeAttributes() throws IOException {
+    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+    Map<String, Set<NodeAttribute>> toReplaceMap = new HashMap<>();
+    Map<NodeAttribute, AttributeValue> nodeAttributes;
+    Set<NodeAttribute> filteredAttributes;
+    Set<NodeAttribute> clusterAttributes;
+
+    // Add 3 attributes to host1
+    //  yarn.test1.io/A1=host1_v1_1
+    //  yarn.test1.io/A2=host1_v1_2
+    //  yarn.test1.io/A3=host1_v1_3
+    toAddAttributes.put(HOSTNAMES[0],
+        createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"));
+
+    attributesManager.addNodeAttributes(toAddAttributes);
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(3, nodeAttributes.size());
+
+    // Add 10 distributed node attributes to host1
+    //  nn.yarn.io/dist-node-attribute1=dist_v1_1
+    //  nn.yarn.io/dist-node-attribute2=dist_v1_2
+    //  ...
+    //  nn.yarn.io/dist-node-attribute10=dist_v1_10
+    toAddAttributes.clear();
+    toAddAttributes.put(HOSTNAMES[0],
+        createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED,
+            10, "dist-node-attribute", "dist_v1"));
+    attributesManager.addNodeAttributes(toAddAttributes);
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(13, nodeAttributes.size());
+    clusterAttributes = attributesManager.getClusterNodeAttributes(
+        Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
+    Assert.assertEquals(13, clusterAttributes.size());
+
+    // Replace by prefix
+    // Same distributed attributes names, but different values.
+    Set<NodeAttribute> toReplaceAttributes =
+        createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 5,
+            "dist-node-attribute", "dist_v2");
+
+    attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+        ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(8, nodeAttributes.size());
+    clusterAttributes = attributesManager.getClusterNodeAttributes(
+        Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
+    Assert.assertEquals(8, clusterAttributes.size());
+
+    // Now we have 5 distributed attributes
+    filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
+        nodeAttributes.keySet(), NodeAttribute.PREFIX_DISTRIBUTED);
+    Assert.assertEquals(5, filteredAttributes.size());
+    // Values are updated to have prefix dist_v2
+    Assert.assertTrue(filteredAttributes.stream().allMatch(
+        nodeAttribute ->
+            nodeAttribute.getAttributeValue().startsWith("dist_v2")));
+
+    // We still have 3 yarn.test1.io attributes
+    filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
+        nodeAttributes.keySet(), PREFIXES[0]);
+    Assert.assertEquals(3, filteredAttributes.size());
+
+    // Replace with prefix
+    // Different attribute names
+    toReplaceAttributes =
+        createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 1,
+            "dist-node-attribute-v2", "dist_v3");
+    attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+        ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(4, nodeAttributes.size());
+    clusterAttributes = attributesManager.getClusterNodeAttributes(
+        Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED));
+    Assert.assertEquals(1, clusterAttributes.size());
+    NodeAttribute att = clusterAttributes.iterator().next();
+    Assert.assertEquals("dist-node-attribute-v2_0", att.getAttributeName());
+    Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
+        att.getAttributePrefix());
+    Assert.assertEquals("dist_v3_0", att.getAttributeValue());
+
+    // Replace all attributes
+    toReplaceMap.put(HOSTNAMES[0],
+        createAttributesForTest(PREFIXES[1], 2, "B", "B_v1"));
+    attributesManager.replaceNodeAttributes(null, toReplaceMap);
+
+    nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+    Assert.assertEquals(2, nodeAttributes.size());
+    clusterAttributes = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1]));
+    Assert.assertEquals(2, clusterAttributes.size());
+    clusterAttributes = attributesManager
+        .getClusterNodeAttributes(Sets.newHashSet(
+            NodeAttribute.PREFIX_DISTRIBUTED));
+    Assert.assertEquals(0, clusterAttributes.size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to