This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
     new ee8f24c  HDDS-1982. Extend SCMNodeManager to support decommission and 
maintenance states. Contributed by Stephen O'Donnell.
ee8f24c is described below

commit ee8f24ca4fcf364b12ae782beed790c7653bdafc
Author: Anu Engineer <aengin...@apache.org>
AuthorDate: Fri Sep 20 11:42:21 2019 -0700

    HDDS-1982. Extend SCMNodeManager to support decommission and maintenance 
states.
    Contributed by Stephen O'Donnell.
---
 hadoop-hdds/common/src/main/proto/hdds.proto       |  10 +-
 .../hdds/scm/block/SCMBlockDeletingService.java    |   7 +-
 .../placement/algorithms/SCMCommonPolicy.java      |   4 +-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  |  50 +++-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  31 ++-
 .../hadoop/hdds/scm/node/NodeStateManager.java     | 291 +++++++++++----------
 .../apache/hadoop/hdds/scm/node/NodeStatus.java    |  93 +++++++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  59 ++++-
 .../hadoop/hdds/scm/node/SCMNodeMetrics.java       |   8 -
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  | 241 ++++++++++++-----
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   4 +-
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   4 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |   4 +-
 .../hdds/scm/server/StorageContainerManager.java   |   3 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   3 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  37 ++-
 .../scm/container/TestContainerReportHandler.java  |  12 +-
 .../algorithms/TestContainerPlacementFactory.java  |   4 +-
 .../TestSCMContainerPlacementCapacity.java         |   4 +-
 .../TestSCMContainerPlacementRackAware.java        |   4 +-
 .../TestSCMContainerPlacementRandom.java           |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   2 +-
 .../hadoop/hdds/scm/node/TestNodeStateManager.java | 223 ++++++++++++++++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  83 +++---
 .../hdds/scm/node/states/TestNodeStateMap.java     | 140 ++++++++++
 .../placement/TestContainerPlacement.java          |   6 +-
 .../testutils/ReplicationNodeManagerMock.java      |  36 ++-
 .../hadoop/hdds/scm/cli/TopologySubcommand.java    |   4 -
 .../hadoop/ozone/TestStorageContainerManager.java  |   5 +-
 .../hadoop/ozone/scm/node/TestSCMNodeMetrics.java  |   4 -
 30 files changed, 1063 insertions(+), 317 deletions(-)

diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto 
b/hadoop-hdds/common/src/main/proto/hdds.proto
index d2bb355..294f2b7 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -100,8 +100,14 @@ enum NodeState {
     HEALTHY = 1;
     STALE = 2;
     DEAD = 3;
-    DECOMMISSIONING = 4;
-    DECOMMISSIONED = 5;
+}
+
+enum NodeOperationalState {
+    IN_SERVICE = 1;
+    DECOMMISSIONING = 2;
+    DECOMMISSIONED = 3;
+    ENTERING_MAINTENANCE = 4;
+    IN_MAINTENANCE = 5;
 }
 
 enum QueryScope {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index ad77624..b5e5d16 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
@@ -137,7 +137,10 @@ public class SCMBlockDeletingService extends 
BackgroundService {
       // to delete blocks.
       LOG.debug("Running DeletedBlockTransactionScanner");
       DatanodeDeletedBlockTransactions transactions = null;
-      List<DatanodeDetails> datanodes = 
nodeManager.getNodes(NodeState.HEALTHY);
+      // TODO - DECOMM - should we be deleting blocks from decom nodes
+      //        and what about entering maintenance.
+      List<DatanodeDetails> datanodes =
+          nodeManager.getNodes(NodeStatus.inServiceHealthy());
       Map<Long, Long> transactionMap = null;
       if (datanodes != null) {
         transactions = new DatanodeDeletedBlockTransactions(containerManager,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
index 77cdd83..63cd8e1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
@@ -23,7 +23,7 @@ import 
org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +108,7 @@ public abstract class SCMCommonPolicy implements 
ContainerPlacementPolicy {
       List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
       int nodesRequired, final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
-        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+        nodeManager.getNodes(NodeStatus.inServiceHealthy());
     if (excludedNodes != null) {
       healthyNodes.removeAll(excludedNodes);
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index d06ea2a..66e1ca4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.node;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
@@ -41,25 +42,39 @@ public class DatanodeInfo extends DatanodeDetails {
 
   private List<StorageReportProto> storageReports;
 
+  private NodeStatus nodeStatus;
+
   /**
    * Constructs DatanodeInfo from DatanodeDetails.
    *
    * @param datanodeDetails Details about the datanode
    */
-  public DatanodeInfo(DatanodeDetails datanodeDetails) {
+  public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus) {
     super(datanodeDetails);
     this.lock = new ReentrantReadWriteLock();
     this.lastHeartbeatTime = Time.monotonicNow();
     this.storageReports = Collections.emptyList();
+    this.nodeStatus = nodeStatus;
   }
 
   /**
    * Updates the last heartbeat time with current time.
    */
   public void updateLastHeartbeatTime() {
+    updateLastHeartbeatTime(Time.monotonicNow());
+  }
+
+  /**
+   * Sets the last heartbeat time to a given value. Intended to be used
+   * only for tests.
+   *
+   * @param milliSecondsSinceEpoch - ms since Epoch to set as the heartbeat 
time
+   */
+  @VisibleForTesting
+  public void updateLastHeartbeatTime(long milliSecondsSinceEpoch) {
     try {
       lock.writeLock().lock();
-      lastHeartbeatTime = Time.monotonicNow();
+      lastHeartbeatTime = milliSecondsSinceEpoch;
     } finally {
       lock.writeLock().unlock();
     }
@@ -109,6 +124,37 @@ public class DatanodeInfo extends DatanodeDetails {
   }
 
   /**
+   * Return the current NodeStatus for the datanode.
+   *
+   * @return NodeStatus - the current nodeStatus
+   */
+  public NodeStatus getNodeStatus() {
+    try {
+      lock.readLock().lock();
+      return nodeStatus;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Update the NodeStatus for this datanode. When using this method
+   * be ware of the potential for lost updates if two threads read the
+   * current status, update one field and then write it back without
+   * locking enforced outside of this class.
+   *
+   * @param newNodeStatus - the new NodeStatus object
+   */
+  public void setNodeStatus(NodeStatus newNodeStatus) {
+    try {
+      lock.writeLock().lock();
+      this.nodeStatus = newNodeStatus;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
    * Returns the last updated time of datanode info.
    * @return the last updated time of datanode info.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index d8890fb..afff7a3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -63,18 +64,38 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
     EventHandler<CommandForDatanode>, NodeManagerMXBean, Closeable {
 
   /**
+   * Gets all Live Datanodes that are currently communicating with SCM.
+   * @param nodeStatus - Status of the node to return
+   * @return List of Datanodes that are Heartbeating SCM.
+   */
+  List<DatanodeDetails> getNodes(NodeStatus nodeStatus);
+
+  /**
    * Gets all Live Datanodes that is currently communicating with SCM.
-   * @param nodeState - State of the node
+   * @param opState - The operational state of the node
+   * @param health - The health of the node
    * @return List of Datanodes that are Heartbeating SCM.
    */
-  List<DatanodeDetails> getNodes(NodeState nodeState);
+  List<DatanodeDetails> getNodes(
+      NodeOperationalState opState, NodeState health);
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM with the
+   * given status.
+   * @param nodeStatus - State of the node
+   * @return int -- count
+   */
+  int getNodeCount(NodeStatus nodeStatus);
 
   /**
-   * Returns the Number of Datanodes that are communicating with SCM.
-   * @param nodeState - State of the node
+   * Returns the Number of Datanodes that are communicating with SCM in the
+   * given state.
+   * @param opState - The operational state of the node
+   * @param health - The health of the node
    * @return int -- count
    */
-  int getNodeCount(NodeState nodeState);
+  int getNodeCount(
+      NodeOperationalState opState, NodeState health);
 
   /**
    * Get all datanodes known to SCM.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 954cb0e..1e1a50c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -23,7 +23,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -75,7 +77,12 @@ public class NodeStateManager implements Runnable, Closeable 
{
    * Node's life cycle events.
    */
   private enum NodeLifeCycleEvent {
-    TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED
+    TIMEOUT, RESTORE, RESURRECT
+  }
+
+  private enum NodeOperationStateEvent {
+    START_DECOMMISSION, COMPLETE_DECOMMISSION, START_MAINTENANCE,
+    ENTER_MAINTENANCE, RETURN_TO_SERVICE
   }
 
   private static final Logger LOG = LoggerFactory
@@ -84,7 +91,12 @@ public class NodeStateManager implements Runnable, Closeable 
{
   /**
    * StateMachine for node lifecycle.
    */
-  private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine;
+  private final StateMachine<NodeState, NodeLifeCycleEvent> nodeHealthSM;
+  /**
+   * StateMachine for node operational state.
+   */
+  private final StateMachine<HddsProtos.NodeOperationalState,
+      NodeOperationStateEvent> nodeOpStateSM;
   /**
    * This is the map which maintains the current state of all datanodes.
    */
@@ -100,7 +112,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
   /**
    * Maps the event to be triggered when a node state us updated.
    */
-  private final Map<NodeState, Event<DatanodeDetails>> state2EventMap;
+  private final Map<NodeStatus, Event<DatanodeDetails>> state2EventMap;
   /**
    * ExecutorService used for scheduling heartbeat processing thread.
    */
@@ -150,9 +162,11 @@ public class NodeStateManager implements Runnable, 
Closeable {
     this.state2EventMap = new HashMap<>();
     initialiseState2EventMap();
     Set<NodeState> finalStates = new HashSet<>();
-    finalStates.add(NodeState.DECOMMISSIONED);
-    this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates);
-    initializeStateMachine();
+    Set<HddsProtos.NodeOperationalState> opStateFinalStates = new HashSet<>();
+    this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, finalStates);
+    this.nodeOpStateSM = new StateMachine<>(
+        NodeOperationalState.IN_SERVICE, opStateFinalStates);
+    initializeStateMachines();
     heartbeatCheckerIntervalMs = HddsServerUtil
         .getScmheartbeatCheckerInterval(conf);
     staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf);
@@ -176,10 +190,12 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * Populates state2event map.
    */
   private void initialiseState2EventMap() {
-    state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
-    state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
-    state2EventMap
-        .put(NodeState.HEALTHY, SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE);
+    state2EventMap.put(NodeStatus.inServiceStale(), SCMEvents.STALE_NODE);
+    state2EventMap.put(NodeStatus.inServiceDead(), SCMEvents.DEAD_NODE);
+    state2EventMap.put(NodeStatus.inServiceHealthy(),
+        SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE);
+    // TODO - add whatever events are needed for decomm / maint to stale, dead,
+    //        healthy
   }
 
   /*
@@ -198,18 +214,6 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * State: DEAD            -------------------> HEALTHY
    * Event:                       RESURRECT
    *
-   * State: HEALTHY         -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
-   *
-   * State: STALE           -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
-   *
-   * State: DEAD            -------------------> DECOMMISSIONING
-   * Event:                     DECOMMISSION
-   *
-   * State: DECOMMISSIONING -------------------> DECOMMISSIONED
-   * Event:                     DECOMMISSIONED
-   *
    *  Node State Flow
    *
    *  +--------------------------------------------------------+
@@ -219,47 +223,51 @@ public class NodeStateManager implements Runnable, 
Closeable {
    *  |   |                          |                         |
    *  V   V                          |                         |
    * [HEALTHY]------------------->[STALE]------------------->[DEAD]
-   *    |         (TIMEOUT)          |         (TIMEOUT)       |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    |                            |                         |
-   *    | (DECOMMISSION)             | (DECOMMISSION)          | (DECOMMISSION)
-   *    |                            V                         |
-   *    +------------------->[DECOMMISSIONING]<----------------+
-   *                                 |
-   *                                 | (DECOMMISSIONED)
-   *                                 |
-   *                                 V
-   *                          [DECOMMISSIONED]
    *
    */
 
   /**
    * Initializes the lifecycle of node state machine.
    */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(
+  private void initializeStateMachines() {
+    nodeHealthSM.addTransition(
         NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
-    stateMachine.addTransition(
+    nodeHealthSM.addTransition(
         NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
-    stateMachine.addTransition(
+    nodeHealthSM.addTransition(
         NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE);
-    stateMachine.addTransition(
+    nodeHealthSM.addTransition(
         NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT);
-    stateMachine.addTransition(
-        NodeState.HEALTHY, NodeState.DECOMMISSIONING,
-        NodeLifeCycleEvent.DECOMMISSION);
-    stateMachine.addTransition(
-        NodeState.STALE, NodeState.DECOMMISSIONING,
-        NodeLifeCycleEvent.DECOMMISSION);
-    stateMachine.addTransition(
-        NodeState.DEAD, NodeState.DECOMMISSIONING,
-        NodeLifeCycleEvent.DECOMMISSION);
-    stateMachine.addTransition(
-        NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
-        NodeLifeCycleEvent.DECOMMISSIONED);
 
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.IN_SERVICE, NodeOperationalState.DECOMMISSIONING,
+        NodeOperationStateEvent.START_DECOMMISSION);
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.DECOMMISSIONING, NodeOperationalState.IN_SERVICE,
+        NodeOperationStateEvent.RETURN_TO_SERVICE);
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.DECOMMISSIONING,
+        NodeOperationalState.DECOMMISSIONED,
+        NodeOperationStateEvent.COMPLETE_DECOMMISSION);
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.DECOMMISSIONED, NodeOperationalState.IN_SERVICE,
+        NodeOperationStateEvent.RETURN_TO_SERVICE);
+
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.IN_SERVICE,
+        NodeOperationalState.ENTERING_MAINTENANCE,
+        NodeOperationStateEvent.START_MAINTENANCE);
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.ENTERING_MAINTENANCE,
+        NodeOperationalState.IN_SERVICE,
+        NodeOperationStateEvent.RETURN_TO_SERVICE);
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.ENTERING_MAINTENANCE,
+        NodeOperationalState.IN_MAINTENANCE,
+        NodeOperationStateEvent.ENTER_MAINTENANCE);
+    nodeOpStateSM.addTransition(
+        NodeOperationalState.IN_MAINTENANCE, NodeOperationalState.IN_SERVICE,
+        NodeOperationStateEvent.RETURN_TO_SERVICE);
   }
 
   /**
@@ -271,7 +279,8 @@ public class NodeStateManager implements Runnable, 
Closeable {
    */
   public void addNode(DatanodeDetails datanodeDetails)
       throws NodeAlreadyExistsException {
-    nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState());
+    nodeStateMap.addNode(datanodeDetails, new NodeStatus(
+        nodeOpStateSM.getInitialState(), nodeHealthSM.getInitialState()));
     eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
   }
 
@@ -317,59 +326,63 @@ public class NodeStateManager implements Runnable, 
Closeable {
    *
    * @throws NodeNotFoundException if the node is not present
    */
-  public NodeState getNodeState(DatanodeDetails datanodeDetails)
+  public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails)
       throws NodeNotFoundException {
-    return nodeStateMap.getNodeState(datanodeDetails.getUuid());
+    return nodeStateMap.getNodeStatus(datanodeDetails.getUuid());
   }
 
   /**
-   * Returns all the node which are in healthy state.
+   * Returns all the node which are in healthy state, ignoring the operational
+   * state.
    *
    * @return list of healthy nodes
    */
   public List<DatanodeInfo> getHealthyNodes() {
-    return getNodes(NodeState.HEALTHY);
+    return getNodes(null, NodeState.HEALTHY);
   }
 
   /**
-   * Returns all the node which are in stale state.
+   * Returns all the node which are in stale state, ignoring the operational
+   * state.
    *
    * @return list of stale nodes
    */
   public List<DatanodeInfo> getStaleNodes() {
-    return getNodes(NodeState.STALE);
+    return getNodes(null, NodeState.STALE);
   }
 
   /**
-   * Returns all the node which are in dead state.
+   * Returns all the node which are in dead state, ignoring the operational
+   * state.
    *
    * @return list of dead nodes
    */
   public List<DatanodeInfo> getDeadNodes() {
-    return getNodes(NodeState.DEAD);
+    return getNodes(null, NodeState.DEAD);
   }
 
   /**
-   * Returns all the node which are in the specified state.
+   * Returns all the nodes with the specified status.
    *
-   * @param state NodeState
+   * @param status NodeStatus
    *
    * @return list of nodes
    */
-  public List<DatanodeInfo> getNodes(NodeState state) {
-    List<DatanodeInfo> nodes = new ArrayList<>();
-    nodeStateMap.getNodes(state).forEach(
-        uuid -> {
-          try {
-            nodes.add(nodeStateMap.getNodeInfo(uuid));
-          } catch (NodeNotFoundException e) {
-            // This should not happen unless someone else other than
-            // NodeStateManager is directly modifying NodeStateMap and removed
-            // the node entry after we got the list of UUIDs.
-            LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
-          }
-        });
-    return nodes;
+  public List<DatanodeInfo> getNodes(NodeStatus status) {
+    return nodeStateMap.getDatanodeInfos(status);
+  }
+
+  /**
+   * Returns all the nodes with the specified operationalState and health.
+   *
+   * @param opState The operationalState of the node
+   * @param health  The node health
+   *
+   * @return list of nodes matching the passed states
+   */
+  public List<DatanodeInfo> getNodes(
+      NodeOperationalState opState, NodeState health) {
+    return nodeStateMap.getDatanodeInfos(opState, health);
   }
 
   /**
@@ -378,19 +391,7 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @return all the managed nodes
    */
   public List<DatanodeInfo> getAllNodes() {
-    List<DatanodeInfo> nodes = new ArrayList<>();
-    nodeStateMap.getAllNodes().forEach(
-        uuid -> {
-          try {
-            nodes.add(nodeStateMap.getNodeInfo(uuid));
-          } catch (NodeNotFoundException e) {
-            // This should not happen unless someone else other than
-            // NodeStateManager is directly modifying NodeStateMap and removed
-            // the node entry after we got the list of UUIDs.
-            LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
-          }
-        });
-    return nodes;
+    return nodeStateMap.getAllDatanodeInfos();
   }
 
   /**
@@ -403,41 +404,53 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
-   * Returns the count of healthy nodes.
+   * Returns the count of healthy nodes, ignoring operational state.
    *
    * @return healthy node count
    */
   public int getHealthyNodeCount() {
-    return getNodeCount(NodeState.HEALTHY);
+    return getHealthyNodes().size();
   }
 
   /**
-   * Returns the count of stale nodes.
+   * Returns the count of stale nodes, ignoring operational state.
    *
    * @return stale node count
    */
   public int getStaleNodeCount() {
-    return getNodeCount(NodeState.STALE);
+    return getStaleNodes().size();
   }
 
   /**
-   * Returns the count of dead nodes.
+   * Returns the count of dead nodes, ignoring operational state.
    *
    * @return dead node count
    */
   public int getDeadNodeCount() {
-    return getNodeCount(NodeState.DEAD);
+    return getDeadNodes().size();
   }
 
   /**
-   * Returns the count of nodes in specified state.
+   * Returns the count of nodes in specified status.
    *
-   * @param state NodeState
+   * @param status NodeState
    *
    * @return node count
    */
-  public int getNodeCount(NodeState state) {
-    return nodeStateMap.getNodeCount(state);
+  public int getNodeCount(NodeStatus status) {
+    return nodeStateMap.getNodeCount(status);
+  }
+
+  /**
+   * Returns the count of nodes in the specified states.
+   *
+   * @param opState The operational state of the node
+   * @param health The health of the node
+   *
+   * @return node count
+   */
+  public int getNodeCount(NodeOperationalState opState, NodeState health) {
+    return nodeStateMap.getNodeCount(opState, health);
   }
 
   /**
@@ -536,7 +549,8 @@ public class NodeStateManager implements Runnable, 
Closeable {
     scheduleNextHealthCheck();
   }
 
-  private void checkNodesHealth() {
+  @VisibleForTesting
+  public void checkNodesHealth() {
 
     /*
      *
@@ -578,39 +592,32 @@ public class NodeStateManager implements Runnable, 
Closeable {
     Predicate<Long> deadNodeCondition =
         (lastHbTime) -> lastHbTime < staleNodeDeadline;
     try {
-      for (NodeState state : NodeState.values()) {
-        List<UUID> nodes = nodeStateMap.getNodes(state);
-        for (UUID id : nodes) {
-          DatanodeInfo node = nodeStateMap.getNodeInfo(id);
-          switch (state) {
-          case HEALTHY:
-            // Move the node to STALE if the last heartbeat time is less than
-            // configured stale-node interval.
-            updateNodeState(node, staleNodeCondition, state,
-                  NodeLifeCycleEvent.TIMEOUT);
-            break;
-          case STALE:
-            // Move the node to DEAD if the last heartbeat time is less than
-            // configured dead-node interval.
-            updateNodeState(node, deadNodeCondition, state,
-                NodeLifeCycleEvent.TIMEOUT);
-            // Restore the node if we have received heartbeat before configured
-            // stale-node interval.
-            updateNodeState(node, healthyNodeCondition, state,
-                NodeLifeCycleEvent.RESTORE);
-            break;
-          case DEAD:
-            // Resurrect the node if we have received heartbeat before
-            // configured stale-node interval.
-            updateNodeState(node, healthyNodeCondition, state,
-                NodeLifeCycleEvent.RESURRECT);
-            break;
-            // We don't do anything for DECOMMISSIONING and DECOMMISSIONED in
-            // heartbeat processing.
-          case DECOMMISSIONING:
-          case DECOMMISSIONED:
-          default:
-          }
+      for(DatanodeInfo node : nodeStateMap.getAllDatanodeInfos()) {
+        NodeStatus status = nodeStateMap.getNodeStatus(node.getUuid());
+        switch (status.getHealth()) {
+        case HEALTHY:
+          // Move the node to STALE if the last heartbeat time is less than
+          // configured stale-node interval.
+          updateNodeState(node, staleNodeCondition, status,
+              NodeLifeCycleEvent.TIMEOUT);
+          break;
+        case STALE:
+          // Move the node to DEAD if the last heartbeat time is less than
+          // configured dead-node interval.
+          updateNodeState(node, deadNodeCondition, status,
+              NodeLifeCycleEvent.TIMEOUT);
+          // Restore the node if we have received heartbeat before configured
+          // stale-node interval.
+          updateNodeState(node, healthyNodeCondition, status,
+              NodeLifeCycleEvent.RESTORE);
+          break;
+        case DEAD:
+          // Resurrect the node if we have received heartbeat before
+          // configured stale-node interval.
+          updateNodeState(node, healthyNodeCondition, status,
+              NodeLifeCycleEvent.RESURRECT);
+          break;
+        default:
         }
       }
     } catch (NodeNotFoundException e) {
@@ -669,27 +676,29 @@ public class NodeStateManager implements Runnable, 
Closeable {
    *
    * @param node DatanodeInfo
    * @param condition condition to check
-   * @param state current state of node
+   * @param status current status of node
    * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition
    *                       matches
    *
    * @throws NodeNotFoundException if the node is not present
    */
   private void updateNodeState(DatanodeInfo node, Predicate<Long> condition,
-      NodeState state, NodeLifeCycleEvent lifeCycleEvent)
+      NodeStatus status, NodeLifeCycleEvent lifeCycleEvent)
       throws NodeNotFoundException {
     try {
       if (condition.test(node.getLastHeartbeatTime())) {
-        NodeState newState = stateMachine.getNextState(state, lifeCycleEvent);
-        nodeStateMap.updateNodeState(node.getUuid(), state, newState);
-        if (state2EventMap.containsKey(newState)) {
-          eventPublisher.fireEvent(state2EventMap.get(newState), node);
+        NodeState newHealthState = nodeHealthSM.
+            getNextState(status.getHealth(), lifeCycleEvent);
+        NodeStatus newStatus =
+            nodeStateMap.updateNodeHealthState(node.getUuid(), newHealthState);
+        if (state2EventMap.containsKey(newStatus)) {
+          eventPublisher.fireEvent(state2EventMap.get(newStatus), node);
         }
       }
     } catch (InvalidStateTransitionException e) {
       LOG.warn("Invalid state transition of node {}." +
               " Current state: {}, life cycle event: {}",
-          node, state, lifeCycleEvent);
+          node, status.getHealth(), lifeCycleEvent);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
new file mode 100644
index 0000000..0776c28
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.util.Objects;
+
+/**
+ * This class is used to capture the current status of a datanode. This
+ * includes its health (healthy, stale or dead) and its operation status (
+ * in_service, decommissioned and maintenance mode.
+ */
+public class NodeStatus {
+
+  private HddsProtos.NodeOperationalState operationalState;
+  private HddsProtos.NodeState health;
+
+  public NodeStatus(HddsProtos.NodeOperationalState operationalState,
+             HddsProtos.NodeState health) {
+    this.operationalState = operationalState;
+    this.health = health;
+  }
+
+  public static NodeStatus inServiceHealthy() {
+    return new NodeStatus(HddsProtos.NodeOperationalState.IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY);
+  }
+
+  public static NodeStatus inServiceStale() {
+    return new NodeStatus(HddsProtos.NodeOperationalState.IN_SERVICE,
+        HddsProtos.NodeState.STALE);
+  }
+
+  public static NodeStatus inServiceDead() {
+    return new NodeStatus(HddsProtos.NodeOperationalState.IN_SERVICE,
+        HddsProtos.NodeState.DEAD);
+  }
+
+  public HddsProtos.NodeState getHealth() {
+    return health;
+  }
+
+  public HddsProtos.NodeOperationalState getOperationalState() {
+    return operationalState;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    NodeStatus other = (NodeStatus) obj;
+    if (this.operationalState == other.operationalState &&
+        this.health == other.health) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(health, operationalState);
+  }
+
+  @Override
+  public String toString() {
+    return "OperationalState: "+operationalState+" Health: "+health;
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index d3df858..c277ea9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -150,8 +151,28 @@ public class SCMNodeManager implements NodeManager {
    * @return List of Datanodes that are known to SCM in the requested state.
    */
   @Override
-  public List<DatanodeDetails> getNodes(NodeState nodestate) {
-    return nodeStateManager.getNodes(nodestate).stream()
+  public List<DatanodeDetails> getNodes(NodeStatus nodeStatus) {
+    return nodeStateManager.getNodes(nodeStatus)
+        .stream()
+        .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
+  }
+
+  /**
+   * Returns all datanode that are in the given states. Passing null for one of
+   * of the states acts like a wildcard for that state. This function works by
+   * taking a snapshot of the current collection and then returning the list
+   * from that collection. This means that real map might have changed by the
+   * time we return this list.
+   *
+   * @param opState The operational state of the node
+   * @param health The health of the node
+   * @return List of Datanodes that are known to SCM in the requested states.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(
+      NodeOperationalState opState, NodeState health) {
+    return nodeStateManager.getNodes(opState, health)
+        .stream()
         .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
   }
 
@@ -172,8 +193,21 @@ public class SCMNodeManager implements NodeManager {
    * @return count
    */
   @Override
-  public int getNodeCount(NodeState nodestate) {
-    return nodeStateManager.getNodeCount(nodestate);
+  public int getNodeCount(NodeStatus nodeStatus) {
+    return nodeStateManager.getNodeCount(nodeStatus);
+  }
+
+  /**
+   * Returns the Number of Datanodes by State they are in. Passing null for
+   * either of the states acts like a wildcard for that state.
+   *
+   * @parem nodeOpState - The Operational State of the node
+   * @param health - The health of the node
+   * @return count
+   */
+  @Override
+  public int getNodeCount(NodeOperationalState nodeOpState, NodeState health) {
+    return nodeStateManager.getNodeCount(nodeOpState, health);
   }
 
   /**
@@ -185,7 +219,7 @@ public class SCMNodeManager implements NodeManager {
   @Override
   public NodeState getNodeState(DatanodeDetails datanodeDetails) {
     try {
-      return nodeStateManager.getNodeState(datanodeDetails);
+      return nodeStateManager.getNodeStatus(datanodeDetails).getHealth();
     } catch (NodeNotFoundException e) {
       // TODO: should we throw NodeNotFoundException?
       return null;
@@ -365,9 +399,9 @@ public class SCMNodeManager implements NodeManager {
     final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();
 
     final List<DatanodeInfo> healthyNodes =  nodeStateManager
-        .getNodes(NodeState.HEALTHY);
+        .getHealthyNodes();
     final List<DatanodeInfo> staleNodes = nodeStateManager
-        .getNodes(NodeState.STALE);
+        .getStaleNodes();
     final List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
     datanodes.addAll(staleNodes);
 
@@ -417,9 +451,12 @@ public class SCMNodeManager implements NodeManager {
 
   @Override
   public Map<String, Integer> getNodeCount() {
+    // TODO - This does not consider decom, maint etc.
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
     for(NodeState state : NodeState.values()) {
-      nodeCountMap.put(state.toString(), getNodeCount(state));
+      // TODO - this iterate the node list once per state and needs
+      //        fixed to only perform one pass.
+      nodeCountMap.put(state.toString(), getNodeCount(null, state));
     }
     return nodeCountMap;
   }
@@ -436,10 +473,8 @@ public class SCMNodeManager implements NodeManager {
     long ssdUsed = 0L;
     long ssdRemaining = 0L;
 
-    List<DatanodeInfo> healthyNodes =  nodeStateManager
-        .getNodes(NodeState.HEALTHY);
-    List<DatanodeInfo> staleNodes = nodeStateManager
-        .getNodes(NodeState.STALE);
+    List<DatanodeInfo> healthyNodes =  nodeStateManager.getHealthyNodes();
+    List<DatanodeInfo> staleNodes = nodeStateManager.getStaleNodes();
 
     List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
     datanodes.addAll(staleNodes);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
index 1596523..fd9c9c1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
 
@@ -132,12 +130,6 @@ public final class SCMNodeMetrics implements MetricsSource 
{
             .addGauge(Interns.info("DeadNodes",
                 "Number of dead datanodes"),
                 nodeCount.get(DEAD.toString()))
-            .addGauge(Interns.info("DecommissioningNodes",
-                "Number of decommissioning datanodes"),
-                nodeCount.get(DECOMMISSIONING.toString()))
-            .addGauge(Interns.info("DecommissionedNodes",
-                "Number of decommissioned datanodes"),
-                nodeCount.get(DECOMMISSIONED.toString()))
             .addGauge(Interns.info("DiskCapacity",
                 "Total disk capacity"),
                 nodeInfo.get("DISKCapacity"))
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index 0c1ab2c..6565e81 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -20,13 +20,16 @@ package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 /**
  * Maintains the state of datanodes in SCM. This class should only be used by
@@ -35,16 +38,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * this class.
  */
 public class NodeStateMap {
-
   /**
    * Node id to node info map.
    */
   private final ConcurrentHashMap<UUID, DatanodeInfo> nodeMap;
   /**
-   * Represents the current state of node.
-   */
-  private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap;
-  /**
    * Node to set of containers on the node.
    */
   private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer;
@@ -57,29 +55,18 @@ public class NodeStateMap {
   public NodeStateMap() {
     lock = new ReentrantReadWriteLock();
     nodeMap = new ConcurrentHashMap<>();
-    stateMap = new ConcurrentHashMap<>();
     nodeToContainer = new ConcurrentHashMap<>();
-    initStateMap();
-  }
-
-  /**
-   * Initializes the state map with available states.
-   */
-  private void initStateMap() {
-    for (NodeState state : NodeState.values()) {
-      stateMap.put(state, new HashSet<>());
-    }
   }
 
   /**
    * Adds a node to NodeStateMap.
    *
    * @param datanodeDetails DatanodeDetails
-   * @param nodeState initial NodeState
+   * @param nodeStatus initial NodeStatus
    *
    * @throws NodeAlreadyExistsException if the node already exist
    */
-  public void addNode(DatanodeDetails datanodeDetails, NodeState nodeState)
+  public void addNode(DatanodeDetails datanodeDetails, NodeStatus nodeStatus)
       throws NodeAlreadyExistsException {
     lock.writeLock().lock();
     try {
@@ -87,34 +74,54 @@ public class NodeStateMap {
       if (nodeMap.containsKey(id)) {
         throw new NodeAlreadyExistsException("Node UUID: " + id);
       }
-      nodeMap.put(id, new DatanodeInfo(datanodeDetails));
+      nodeMap.put(id, new DatanodeInfo(datanodeDetails, nodeStatus));
       nodeToContainer.put(id, Collections.emptySet());
-      stateMap.get(nodeState).add(id);
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   /**
-   * Updates the node state.
+   * Updates the node health state.
    *
    * @param nodeId Node Id
-   * @param currentState current state
-   * @param newState new state
+   * @param newHealth new health state
    *
    * @throws NodeNotFoundException if the node is not present
    */
-  public void updateNodeState(UUID nodeId, NodeState currentState,
-                              NodeState newState)throws NodeNotFoundException {
-    lock.writeLock().lock();
+  public NodeStatus updateNodeHealthState(UUID nodeId, NodeState newHealth)
+      throws NodeNotFoundException {
     try {
-      checkIfNodeExist(nodeId);
-      if (stateMap.get(currentState).remove(nodeId)) {
-        stateMap.get(newState).add(nodeId);
-      } else {
-        throw new NodeNotFoundException("Node UUID: " + nodeId +
-            ", not found in state: " + currentState);
-      }
+      lock.writeLock().lock();
+      DatanodeInfo dn = getNodeInfo(nodeId);
+      NodeStatus oldStatus = dn.getNodeStatus();
+      NodeStatus newStatus = new NodeStatus(
+          oldStatus.getOperationalState(), newHealth);
+      dn.setNodeStatus(newStatus);
+      return newStatus;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Updates the node operational state.
+   *
+   * @param nodeId Node Id
+   * @param newOpState new operational state
+   *
+   * @throws NodeNotFoundException if the node is not present
+   */
+  public NodeStatus updateNodeOperationalState(UUID nodeId,
+      NodeOperationalState newOpState) throws NodeNotFoundException {
+    try {
+      lock.writeLock().lock();
+      DatanodeInfo dn = getNodeInfo(nodeId);
+      NodeStatus oldStatus = dn.getNodeStatus();
+      NodeStatus newStatus = new NodeStatus(
+          newOpState, oldStatus.getHealth());
+      dn.setNodeStatus(newStatus);
+      return newStatus;
     } finally {
       lock.writeLock().unlock();
     }
@@ -139,21 +146,38 @@ public class NodeStateMap {
     }
   }
 
-
   /**
    * Returns the list of node ids which are in the specified state.
    *
-   * @param state NodeState
+   * @param status NodeStatus
    *
    * @return list of node ids
    */
-  public List<UUID> getNodes(NodeState state) {
-    lock.readLock().lock();
-    try {
-      return new ArrayList<>(stateMap.get(state));
-    } finally {
-      lock.readLock().unlock();
+  public List<UUID> getNodes(NodeStatus status) {
+    ArrayList<UUID> nodes = new ArrayList<>();
+    for (DatanodeInfo dn : filterNodes(status)) {
+      nodes.add(dn.getUuid());
+    }
+    return nodes;
+  }
+
+  /**
+   * Returns the list of node ids which match the desired operational state
+   * and health. Passing a null for either value is equivalent to a wild card.
+   *
+   * Therefore, passing opState = null, health=stale will return all stale 
nodes
+   * regardless of their operational state.
+   *
+   * @param opState
+   * @param health
+   * @return The list of nodes matching the given states
+   */
+  public List<UUID> getNodes(NodeOperationalState opState, NodeState health) {
+    ArrayList<UUID> nodes = new ArrayList<>();
+    for (DatanodeInfo dn : filterNodes(opState, health)) {
+      nodes.add(dn.getUuid());
     }
+    return nodes;
   }
 
   /**
@@ -162,8 +186,8 @@ public class NodeStateMap {
    * @return list of all the node ids
    */
   public List<UUID> getAllNodes() {
-    lock.readLock().lock();
     try {
+      lock.readLock().lock();
       return new ArrayList<>(nodeMap.keySet());
     } finally {
       lock.readLock().unlock();
@@ -171,22 +195,72 @@ public class NodeStateMap {
   }
 
   /**
-   * Returns the count of nodes in the specified state.
-   *
-   * @param state NodeState
+   * Returns the list of all the nodes as DatanodeInfo objects.
    *
-   * @return Number of nodes in the specified state
+   * @return list of all the node ids
    */
-  public int getNodeCount(NodeState state) {
-    lock.readLock().lock();
+  public List<DatanodeInfo> getAllDatanodeInfos() {
     try {
-      return stateMap.get(state).size();
+      lock.readLock().lock();
+      return new ArrayList<>(nodeMap.values());
     } finally {
       lock.readLock().unlock();
     }
   }
 
   /**
+   * Returns a list of the nodes as DatanodeInfo objects matching the passed
+   * status.
+   *
+   * @param status - The status of the nodes to return
+   * @return List of DatanodeInfo for the matching nodes
+   */
+  public List<DatanodeInfo> getDatanodeInfos(NodeStatus status) {
+    return filterNodes(status);
+  }
+
+  /**
+   * Returns a list of the nodes as DatanodeInfo objects matching the passed
+   * states. Passing null for either of the state values acts as a wildcard
+   * for that state.
+   *
+   * @param opState - The node operational state
+   * @param health - The node health
+   * @return List of DatanodeInfo for the matching nodes
+   */
+  public List<DatanodeInfo> getDatanodeInfos(
+      NodeOperationalState opState, NodeState health) {
+    return filterNodes(opState, health);
+  }
+
+  /**
+   * Returns the count of nodes in the specified state.
+   *
+   * @param state NodeStatus
+   *
+   * @return Number of nodes in the specified state
+   */
+  public int getNodeCount(NodeStatus state) {
+    return getNodes(state).size();
+  }
+
+  /**
+   * Returns the count of node ids which match the desired operational state
+   * and health. Passing a null for either value is equivalent to a wild card.
+   *
+   * Therefore, passing opState=null, health=stale will count all stale nodes
+   * regardless of their operational state.
+   *
+   * @param opState
+   * @param health
+   *
+   * @return Number of nodes in the specified state
+   */
+  public int getNodeCount(NodeOperationalState opState, NodeState health) {
+    return getNodes(opState, health).size();
+  }
+
+  /**
    * Returns the total node count.
    *
    * @return node count
@@ -209,17 +283,15 @@ public class NodeStateMap {
    *
    * @throws NodeNotFoundException if the node is not found
    */
-  public NodeState getNodeState(UUID uuid) throws NodeNotFoundException {
+  public NodeStatus getNodeStatus(UUID uuid) throws NodeNotFoundException {
     lock.readLock().lock();
     try {
-      checkIfNodeExist(uuid);
-      for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
-        if (entry.getValue().contains(uuid)) {
-          return entry.getKey();
-        }
+      DatanodeInfo dn = nodeMap.get(uuid);
+      if (dn == null) {
+        throw new NodeNotFoundException("Node not found in node map." +
+            " UUID: " + uuid);
       }
-      throw new NodeNotFoundException("Node not found in node state map." +
-          " UUID: " + uuid);
+      return dn.getNodeStatus();
     } finally {
       lock.readLock().unlock();
     }
@@ -289,12 +361,13 @@ public class NodeStateMap {
    */
   @Override
   public String toString() {
+    // TODO - fix this method to include the commented out values
     StringBuilder builder = new StringBuilder();
     builder.append("Total number of nodes: ").append(getTotalNodeCount());
-    for (NodeState state : NodeState.values()) {
-      builder.append("Number of nodes in ").append(state).append(" state: ")
-          .append(getNodeCount(state));
-    }
+   // for (NodeState state : NodeState.values()) {
+   //   builder.append("Number of nodes in ").append(state).append(" state: ")
+   //       .append(getNodeCount(state));
+   // }
     return builder.toString();
   }
 
@@ -309,4 +382,50 @@ public class NodeStateMap {
       throw new NodeNotFoundException("Node UUID: " + uuid);
     }
   }
+
+  /**
+   * Create a list of datanodeInfo for all nodes matching the passed states.
+   * Passing null for one of the states acts like a wildcard for that state.
+   *
+   * @param opState
+   * @param health
+   * @return List of DatanodeInfo objects matching the passed state
+   */
+  private List<DatanodeInfo> filterNodes(
+      NodeOperationalState opState, NodeState health) {
+    if (opState != null && health != null) {
+      return filterNodes(new NodeStatus(opState, health));
+    }
+    if (opState == null && health == null) {
+      return getAllDatanodeInfos();
+    }
+    try {
+      lock.readLock().lock();
+      return nodeMap.values().stream()
+          .filter(n -> opState == null
+              || n.getNodeStatus().getOperationalState() == opState)
+          .filter(n -> health == null
+              || n.getNodeStatus().getHealth() == health)
+          .collect(Collectors.toList());
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Create a list of datanodeInfo for all nodes matching the passsed status.
+   *
+   * @param status
+   * @return List of DatanodeInfo objects matching the passed state
+   */
+  private List<DatanodeInfo> filterNodes(NodeStatus status) {
+    try {
+      lock.readLock().lock();
+      return nodeMap.values().stream()
+          .filter(n -> n.getNodeStatus().equals(status))
+          .collect(Collectors.toList());
+    }  finally {
+      lock.readLock().unlock();
+    }
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index a5e3d37..1cebef6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -22,12 +22,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
@@ -141,7 +141,7 @@ public class RatisPipelineProvider implements 
PipelineProvider {
 
     // Get list of healthy nodes
     List<DatanodeDetails> dns =
-        nodeManager.getNodes(NodeState.HEALTHY)
+        nodeManager.getNodes(NodeStatus.inServiceHealthy())
             .parallelStream()
             .filter(dn -> !dnsUsed.contains(dn))
             .limit(factor.getNumber())
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..dad26b58 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 
 import java.io.IOException;
@@ -43,7 +43,7 @@ public class SimplePipelineProvider implements 
PipelineProvider {
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
     List<DatanodeDetails> dns =
-        nodeManager.getNodes(NodeState.HEALTHY);
+        nodeManager.getNodes(NodeStatus.inServiceHealthy());
     if (dns.size() < factor.getNumber()) {
       String e = String
           .format("Cannot create pipeline of factor %d using %d nodes.",
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 7d9cb3e..9e9d2fe 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -548,7 +548,9 @@ public class SCMClientProtocolServer implements
    */
   private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) {
     Set<DatanodeDetails> returnSet = new TreeSet<>();
-    List<DatanodeDetails> tmp = scm.getScmNodeManager().getNodes(nodeState);
+    // TODO - decomm states needed
+    List<DatanodeDetails> tmp = scm.getScmNodeManager()
+        .getNodes(null, nodeState);
     if ((tmp != null) && (tmp.size() > 0)) {
       returnSet.addAll(tmp);
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4ecab37..702102b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -923,7 +923,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
    * @return int -- count
    */
   public int getNodeCount(NodeState nodestate) {
-    return scmNodeManager.getNodeCount(nodestate);
+    // TODO - decomm - this probably needs to accept opState and health
+    return scmNodeManager.getNodeCount(null, nodestate);
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index e5c4766..bc1e771 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -264,7 +265,7 @@ public class TestBlockManager {
 
     // create pipelines
     for (int i = 0;
-         i < nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size(); i++) {
+         i < nodeManager.getNodes(NodeStatus.inServiceHealthy()).size(); i++) {
       pipelineManager.createPipeline(type, factor);
     }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index b7a9813..e27a451 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -147,14 +148,28 @@ public class MockNodeManager implements NodeManager {
     this.safemode = safemode;
   }
 
+
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
    *
-   * @param nodestate - State of the node
+   * @param status The status of the node
+   * @return List of Datanodes that are Heartbeating SCM.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(NodeStatus status) {
+    return getNodes(status.getOperationalState(), status.getHealth());
+  }
+
+  /**
+   * Gets all Live Datanodes that is currently communicating with SCM.
+   *
+   * @param opState - The operational State of the node
+   * @param nodestate - The health of the node
    * @return List of Datanodes that are Heartbeating SCM.
    */
   @Override
-  public List<DatanodeDetails> getNodes(HddsProtos.NodeState nodestate) {
+  public List<DatanodeDetails> getNodes(
+      HddsProtos.NodeOperationalState opState, HddsProtos.NodeState nodestate) 
{
     if (nodestate == HEALTHY) {
       return healthyNodes;
     }
@@ -173,12 +188,24 @@ public class MockNodeManager implements NodeManager {
   /**
    * Returns the Number of Datanodes that are communicating with SCM.
    *
+   * @param status - Status of the node
+   * @return int -- count
+   */
+  @Override
+  public int getNodeCount(NodeStatus status) {
+    return getNodeCount(status.getOperationalState(), status.getHealth());
+  }
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   *
    * @param nodestate - State of the node
    * @return int -- count
    */
   @Override
-  public int getNodeCount(HddsProtos.NodeState nodestate) {
-    List<DatanodeDetails> nodes = getNodes(nodestate);
+  public int getNodeCount(
+      HddsProtos.NodeOperationalState opState, HddsProtos.NodeState nodestate) 
{
+    List<DatanodeDetails> nodes = getNodes(opState, nodestate);
     if (nodes != null) {
       return nodes.size();
     }
@@ -419,7 +446,7 @@ public class MockNodeManager implements NodeManager {
   public Map<String, Integer> getNodeCount() {
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
     for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
-      nodeCountMap.put(state.toString(), getNodeCount(state));
+      nodeCountMap.put(state.toString(), getNodeCount(null, state));
     }
     return nodeCountMap;
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 41585bc..2a93e3b 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -28,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
@@ -114,7 +114,7 @@ public class TestContainerReportHandler {
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
+        NodeStatus.inServiceHealthy()).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
@@ -183,7 +183,7 @@ public class TestContainerReportHandler {
         nodeManager, containerManager);
 
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
+        NodeStatus.inServiceHealthy()).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
@@ -262,7 +262,7 @@ public class TestContainerReportHandler {
         nodeManager, containerManager);
 
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
+        NodeStatus.inServiceHealthy()).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
@@ -341,7 +341,7 @@ public class TestContainerReportHandler {
         nodeManager, containerManager);
 
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
+        NodeStatus.inServiceHealthy()).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
@@ -418,7 +418,7 @@ public class TestContainerReportHandler {
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
+        NodeStatus.inServiceHealthy()).iterator();
 
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index 18c4a64..54c4080 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -19,7 +19,6 @@ package 
org.apache.hadoop.hdds.scm.container.placement.algorithms;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -29,6 +28,7 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -88,7 +88,7 @@ public class TestContainerPlacementFactory {
 
     // create mock node manager
     nodeManager = Mockito.mock(NodeManager.class);
-    when(nodeManager.getNodes(NodeState.HEALTHY))
+    when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
         .thenReturn(new ArrayList<>(datanodes));
     when(nodeManager.getNodeStat(anyObject()))
         .thenReturn(new SCMNodeMetric(storageCapacity, 0L, 100L));
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
index 00ec398..fcb4f44 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
@@ -24,12 +24,12 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.junit.Assert;
 import org.junit.Test;
 import static org.mockito.Matchers.anyObject;
@@ -51,7 +51,7 @@ public class TestSCMContainerPlacementCapacity {
     }
 
     NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
-    when(mockNodeManager.getNodes(NodeState.HEALTHY))
+    when(mockNodeManager.getNodes(NodeStatus.inServiceHealthy()))
         .thenReturn(new ArrayList<>(datanodes));
 
     when(mockNodeManager.getNodeStat(anyObject()))
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 2d8b816..03dd829 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -20,7 +20,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -30,6 +29,7 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -102,7 +102,7 @@ public class TestSCMContainerPlacementRackAware {
 
     // create mock node manager
     nodeManager = Mockito.mock(NodeManager.class);
-    when(nodeManager.getNodes(NodeState.HEALTHY))
+    when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
         .thenReturn(new ArrayList<>(datanodes));
     when(nodeManager.getNodeStat(anyObject()))
         .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 0L, 100L));
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
index 43e3a8d..5edb25f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
@@ -22,12 +22,12 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.junit.Assert;
 import org.junit.Test;
 import static org.mockito.Matchers.anyObject;
@@ -50,7 +50,7 @@ public class TestSCMContainerPlacementRandom {
     }
 
     NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
-    when(mockNodeManager.getNodes(NodeState.HEALTHY))
+    when(mockNodeManager.getNodes(NodeStatus.inServiceHealthy()))
         .thenReturn(new ArrayList<>(datanodes));
 
     when(mockNodeManager.getNodeStat(anyObject()))
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 26ffd8d..af9d5e5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -150,7 +150,7 @@ public class TestContainerPlacement {
 
       //TODO: wait for heartbeat to be processed
       Thread.sleep(4 * 1000);
-      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(nodeCount, nodeManager.getNodeCount(null, HEALTHY));
       assertEquals(capacity * nodeCount,
           (long) nodeManager.getStats().getCapacity().get());
       assertEquals(used * nodeCount,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
new file mode 100644
index 0000000..bc28a43
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNull;
+
+/**
+ * Class to test the NodeStateManager, which is an internal class used by
+ * the SCMNodeManager.
+ */
+
+public class TestNodeStateManager {
+
+  private NodeStateManager nsm;
+  private Configuration conf;
+  private MockEventPublisher eventPublisher;
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+    eventPublisher = new MockEventPublisher();
+    nsm = new NodeStateManager(conf, eventPublisher);
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  @Test
+  public void testNodeCanBeAddedAndRetrieved()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    // Create a datanode, then add and retrieve it
+    DatanodeDetails dn = generateDatanode();
+    nsm.addNode(dn);
+    assertEquals(dn.getUuid(), nsm.getNode(dn).getUuid());
+    // Now get the status of the newly added node and it should be
+    // IN_SERVICE and HEALTHY
+    NodeStatus expectedState = NodeStatus.inServiceHealthy();
+    assertEquals(expectedState, nsm.getNodeStatus(dn));
+  }
+
+  @Test
+  public void testGetAllNodesReturnsCorrectly()
+      throws NodeAlreadyExistsException {
+    DatanodeDetails dn = generateDatanode();
+    nsm.addNode(dn);
+    dn = generateDatanode();
+    nsm.addNode(dn);
+    assertEquals(2, nsm.getAllNodes().size());
+    assertEquals(2, nsm.getTotalNodeCount());
+  }
+
+  @Test
+  public void testGetNodeCountReturnsCorrectly()
+      throws NodeAlreadyExistsException {
+    DatanodeDetails dn = generateDatanode();
+    nsm.addNode(dn);
+    assertEquals(1, nsm.getNodes(NodeStatus.inServiceHealthy()).size());
+    assertEquals(0, nsm.getNodes(NodeStatus.inServiceStale()).size());
+  }
+
+  @Test
+  public void testGetNodeCount() throws NodeAlreadyExistsException {
+    DatanodeDetails dn = generateDatanode();
+    nsm.addNode(dn);
+    assertEquals(1, nsm.getNodeCount(NodeStatus.inServiceHealthy()));
+    assertEquals(0, nsm.getNodeCount(NodeStatus.inServiceStale()));
+  }
+
+  @Test
+  public void testNodesMarkedDeadAndStale()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    long now = Time.monotonicNow();
+
+    // Set the dead and stale limits to be 1 second larger than configured
+    long staleLimit = HddsServerUtil.getStaleNodeInterval(conf) + 1000;
+    long deadLimit = HddsServerUtil.getDeadNodeInterval(conf) + 1000;
+
+    DatanodeDetails staleDn = generateDatanode();
+    nsm.addNode(staleDn);
+    nsm.getNode(staleDn).updateLastHeartbeatTime(now - staleLimit);
+
+    DatanodeDetails deadDn = generateDatanode();
+    nsm.addNode(deadDn);
+    nsm.getNode(deadDn).updateLastHeartbeatTime(now - deadLimit);
+
+    DatanodeDetails healthyDn = generateDatanode();
+    nsm.addNode(healthyDn);
+    nsm.getNode(healthyDn).updateLastHeartbeatTime();
+
+    nsm.checkNodesHealth();
+    assertEquals(healthyDn, nsm.getHealthyNodes().get(0));
+    // A node cannot go directly to dead. It must be marked stale first
+    // due to the allowed state transitions. Therefore we will initially have 2
+    // stale nodesCheck it is in stale nodes
+    assertEquals(2, nsm.getStaleNodes().size());
+    // Now check health again and it should be in deadNodes()
+    nsm.checkNodesHealth();
+    assertEquals(staleDn, nsm.getStaleNodes().get(0));
+    assertEquals(deadDn, nsm.getDeadNodes().get(0));
+  }
+
+  @Test
+  public void testNodeCanTransitionThroughHealthStatesAndFiresEvents()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    long now = Time.monotonicNow();
+
+    // Set the dead and stale limits to be 1 second larger than configured
+    long staleLimit = HddsServerUtil.getStaleNodeInterval(conf) + 1000;
+    long deadLimit = HddsServerUtil.getDeadNodeInterval(conf) + 1000;
+
+    DatanodeDetails dn = generateDatanode();
+    nsm.addNode(dn);
+    assertEquals("New_Node", eventPublisher.getLastEvent().getName());
+    DatanodeInfo dni = nsm.getNode(dn);
+    dni.updateLastHeartbeatTime();
+
+    // Ensure node is initially healthy
+    eventPublisher.clearEvents();
+    nsm.checkNodesHealth();
+    assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth());
+    assertNull(eventPublisher.getLastEvent());
+
+    // Set the heartbeat old enough to make it stale
+    dni.updateLastHeartbeatTime(now - staleLimit);
+    nsm.checkNodesHealth();
+    assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth());
+    assertEquals("Stale_Node", eventPublisher.getLastEvent().getName());
+
+    // Now make it dead
+    dni.updateLastHeartbeatTime(now - deadLimit);
+    nsm.checkNodesHealth();
+    assertEquals(NodeState.DEAD, nsm.getNodeStatus(dn).getHealth());
+    assertEquals("Dead_Node", eventPublisher.getLastEvent().getName());
+
+    // Transition back to healthy from dead
+    dni.updateLastHeartbeatTime();
+    nsm.checkNodesHealth();
+    assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth());
+    assertEquals("NON_HEALTHY_TO_HEALTHY_NODE",
+        eventPublisher.getLastEvent().getName());
+
+    // Make the node stale again, and transition to healthy.
+    dni.updateLastHeartbeatTime(now - staleLimit);
+    nsm.checkNodesHealth();
+    assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth());
+    assertEquals("Stale_Node", eventPublisher.getLastEvent().getName());
+    dni.updateLastHeartbeatTime();
+    nsm.checkNodesHealth();
+    assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth());
+    assertEquals("NON_HEALTHY_TO_HEALTHY_NODE",
+        eventPublisher.getLastEvent().getName());
+  }
+
+  private DatanodeDetails generateDatanode() {
+    String uuid = UUID.randomUUID().toString();
+    return DatanodeDetails.newBuilder().setUuid(uuid).build();
+  }
+
+  static class  MockEventPublisher implements EventPublisher {
+
+    private List<Event> events = new ArrayList<>();
+    private List<Object> payloads = new ArrayList<>();
+
+    public void clearEvents() {
+      events.clear();
+      payloads.clear();
+    }
+
+    public List<Event> getEvents() {
+      return events;
+    }
+
+    public Event getLastEvent() {
+      if (events.size() == 0) {
+        return null;
+      } else {
+        return events.get(events.size()-1);
+      }
+    }
+
+    @Override
+    public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void
+        fireEvent(EVENT_TYPE event, PAYLOAD payload) {
+      events.add(event);
+      payloads.add(payload);
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index d028851..a37142f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -72,9 +72,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
 import static org.junit.Assert.assertEquals;
@@ -238,7 +235,8 @@ public class TestSCMNodeManager {
       }
       //TODO: wait for heartbeat to be processed
       Thread.sleep(4 * 1000);
-      assertEquals(count, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(count, nodeManager.getNodeCount(
+          NodeStatus.inServiceHealthy()));
     }
   }
 
@@ -312,9 +310,10 @@ public class TestSCMNodeManager {
       // Wait for 2 seconds, wait a total of 4 seconds to make sure that the
       // node moves into stale state.
       Thread.sleep(2 * 1000);
-      List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE);
+      List<DatanodeDetails> staleNodeList =
+          nodeManager.getNodes(NodeStatus.inServiceStale());
       assertEquals("Expected to find 1 stale node",
-          1, nodeManager.getNodeCount(STALE));
+          1, nodeManager.getNodeCount(NodeStatus.inServiceStale()));
       assertEquals("Expected to find 1 stale node",
           1, staleNodeList.size());
       assertEquals("Stale node is not the expected ID", staleNode
@@ -331,16 +330,17 @@ public class TestSCMNodeManager {
       Thread.sleep(2 * 1000);
 
       // the stale node has been removed
-      staleNodeList = nodeManager.getNodes(STALE);
+      staleNodeList = nodeManager.getNodes(NodeStatus.inServiceStale());
       assertEquals("Expected to find 1 stale node",
-          0, nodeManager.getNodeCount(STALE));
+          0, nodeManager.getNodeCount(NodeStatus.inServiceStale()));
       assertEquals("Expected to find 1 stale node",
           0, staleNodeList.size());
 
       // Check for the dead node now.
-      List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD);
+      List<DatanodeDetails> deadNodeList =
+          nodeManager.getNodes(NodeStatus.inServiceDead());
       assertEquals("Expected to find 1 dead node", 1,
-          nodeManager.getNodeCount(DEAD));
+          nodeManager.getNodeCount(NodeStatus.inServiceDead()));
       assertEquals("Expected to find 1 dead node",
           1, deadNodeList.size());
       assertEquals("Dead node is not the expected ID", staleNode
@@ -388,8 +388,8 @@ public class TestSCMNodeManager {
 
       //Assert all nodes are healthy.
       assertEquals(2, nodeManager.getAllNodes().size());
-      assertEquals(2, nodeManager.getNodeCount(HEALTHY));
-
+      assertEquals(2,
+          nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
       /**
        * Simulate a JVM Pause and subsequent handling in following steps:
        * Step 1 : stop heartbeat check process for stale node interval
@@ -424,7 +424,7 @@ public class TestSCMNodeManager {
 
       // Step 4 : all nodes should still be HEALTHY
       assertEquals(2, nodeManager.getAllNodes().size());
-      assertEquals(2, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(2, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
 
       // Step 5 : heartbeat for node1
       nodeManager.processHeartbeat(node1);
@@ -433,8 +433,8 @@ public class TestSCMNodeManager {
       Thread.sleep(1000);
 
       // Step 7 : node2 should transition to STALE
-      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
-      assertEquals(1, nodeManager.getNodeCount(STALE));
+      assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
+      assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale()));
     }
   }
 
@@ -533,7 +533,7 @@ public class TestSCMNodeManager {
 
       //Assert all nodes are healthy.
       assertEquals(3, nodeManager.getAllNodes().size());
-      assertEquals(3, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
 
       /**
        * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which
@@ -541,7 +541,7 @@ public class TestSCMNodeManager {
        */
       Thread.sleep(3 * 1000);
       assertEquals(3, nodeManager.getAllNodes().size());
-      assertEquals(3, nodeManager.getNodeCount(STALE));
+      assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceStale()));
 
 
       /**
@@ -559,18 +559,19 @@ public class TestSCMNodeManager {
       Thread.sleep(1500);
       nodeManager.processHeartbeat(healthyNode);
       Thread.sleep(2 * 1000);
-      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
 
 
       // 3.5 seconds from last heartbeat for the stale and deadNode. So those
       //  2 nodes must move to Stale state and the healthy node must
       // remain in the healthy State.
-      List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY);
+      List<DatanodeDetails> healthyList = nodeManager.getNodes(
+          NodeStatus.inServiceHealthy());
       assertEquals("Expected one healthy node", 1, healthyList.size());
       assertEquals("Healthy node is not the expected ID", healthyNode
           .getUuid(), healthyList.get(0).getUuid());
 
-      assertEquals(2, nodeManager.getNodeCount(STALE));
+      assertEquals(2, nodeManager.getNodeCount(NodeStatus.inServiceStale()));
 
       /**
        * Cluster State: Allow healthyNode to remain in healthy state and
@@ -586,14 +587,16 @@ public class TestSCMNodeManager {
       // 3.5 seconds have elapsed for stale node, so it moves into Stale.
       // 7 seconds have elapsed for dead node, so it moves into dead.
       // 2 Seconds have elapsed for healthy node, so it stays in healthy state.
-      healthyList = nodeManager.getNodes(HEALTHY);
-      List<DatanodeDetails> staleList = nodeManager.getNodes(STALE);
-      List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
+      healthyList = nodeManager.getNodes((NodeStatus.inServiceHealthy()));
+      List<DatanodeDetails> staleList =
+          nodeManager.getNodes(NodeStatus.inServiceStale());
+      List<DatanodeDetails> deadList =
+          nodeManager.getNodes(NodeStatus.inServiceDead());
 
       assertEquals(3, nodeManager.getAllNodes().size());
-      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
-      assertEquals(1, nodeManager.getNodeCount(STALE));
-      assertEquals(1, nodeManager.getNodeCount(DEAD));
+      assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
+      assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale()));
+      assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceDead()));
 
       assertEquals("Expected one healthy node",
           1, healthyList.size());
@@ -619,7 +622,7 @@ public class TestSCMNodeManager {
       Thread.sleep(500);
       //Assert all nodes are healthy.
       assertEquals(3, nodeManager.getAllNodes().size());
-      assertEquals(3, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
     }
   }
 
@@ -668,7 +671,7 @@ public class TestSCMNodeManager {
    */
   private boolean findNodes(NodeManager nodeManager, int count,
       HddsProtos.NodeState state) {
-    return count == nodeManager.getNodeCount(state);
+    return count == nodeManager.getNodeCount(NodeStatus.inServiceStale());
   }
 
   /**
@@ -741,11 +744,14 @@ public class TestSCMNodeManager {
       // Assert all healthy nodes are healthy now, this has to be a greater
       // than check since Stale nodes can be healthy when we check the state.
 
-      assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount);
+      assertTrue(nodeManager.getNodeCount(NodeStatus.inServiceHealthy())
+          >= healthyCount);
 
-      assertEquals(deadCount, nodeManager.getNodeCount(DEAD));
+      assertEquals(deadCount,
+          nodeManager.getNodeCount(NodeStatus.inServiceDead()));
 
-      List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
+      List<DatanodeDetails> deadList =
+          nodeManager.getNodes(NodeStatus.inServiceDead());
 
       for (DatanodeDetails node : deadList) {
         assertTrue(deadNodeList.contains(node));
@@ -861,7 +867,8 @@ public class TestSCMNodeManager {
       }
       //TODO: wait for heartbeat to be processed
       Thread.sleep(4 * 1000);
-      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(nodeCount,
+          nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
       assertEquals(capacity * nodeCount, (long) nodeManager.getStats()
           .getCapacity().get());
       assertEquals(used * nodeCount, (long) nodeManager.getStats()
@@ -953,7 +960,7 @@ public class TestSCMNodeManager {
       // Wait up to 4s so that the node becomes stale
       // Verify the usage info should be unchanged.
       GenericTestUtils.waitFor(
-          () -> nodeManager.getNodeCount(STALE) == 1, 100,
+          () -> nodeManager.getNodeCount(NodeStatus.inServiceStale()) == 1, 
100,
           4 * 1000);
       assertEquals(nodeCount, nodeManager.getNodeStats().size());
 
@@ -971,7 +978,7 @@ public class TestSCMNodeManager {
       // Wait up to 4 more seconds so the node becomes dead
       // Verify usage info should be updated.
       GenericTestUtils.waitFor(
-          () -> nodeManager.getNodeCount(DEAD) == 1, 100,
+          () -> nodeManager.getNodeCount(NodeStatus.inServiceDead()) == 1, 100,
           4 * 1000);
 
       assertEquals(0, nodeManager.getNodeStats().size());
@@ -989,7 +996,7 @@ public class TestSCMNodeManager {
       // Wait up to 5 seconds so that the dead node becomes healthy
       // Verify usage info should be updated.
       GenericTestUtils.waitFor(
-          () -> nodeManager.getNodeCount(HEALTHY) == 1,
+          () -> nodeManager.getNodeCount(NodeStatus.inServiceHealthy()) == 1,
           100, 5 * 1000);
       GenericTestUtils.waitFor(
           () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
@@ -1100,7 +1107,8 @@ public class TestSCMNodeManager {
       // verify network topology cluster has all the registered nodes
       Thread.sleep(4 * 1000);
       NetworkTopology clusterMap = scm.getClusterMap();
-      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(nodeCount,
+          nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
       assertEquals(nodeCount, clusterMap.getNumOfLeafNode(""));
       assertEquals(4, clusterMap.getMaxLevel());
       List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
@@ -1142,7 +1150,8 @@ public class TestSCMNodeManager {
       // verify network topology cluster has all the registered nodes
       Thread.sleep(4 * 1000);
       NetworkTopology clusterMap = scm.getClusterMap();
-      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(nodeCount,
+          nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
       assertEquals(nodeCount, clusterMap.getNumOfLeafNode(""));
       assertEquals(3, clusterMap.getMaxLevel());
       List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
new file mode 100644
index 0000000..482f444
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertEquals;
+
+/**
+ * Class to test the NodeStateMap class, which is an internal class used by
+ * NodeStateManager.
+ */
+
+public class TestNodeStateMap {
+
+  private NodeStateMap map;
+
+  @Before
+  public void setUp() {
+    map = new NodeStateMap();
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  @Test
+  public void testNodeCanBeAddedAndRetrieved()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    DatanodeDetails dn = generateDatanode();
+    NodeStatus status = NodeStatus.inServiceHealthy();
+    map.addNode(dn, status);
+    assertEquals(dn, map.getNodeInfo(dn.getUuid()));
+    assertEquals(status, map.getNodeStatus(dn.getUuid()));
+  }
+
+  @Test
+  public void testNodeHealthStateCanBeUpdated()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    DatanodeDetails dn = generateDatanode();
+    NodeStatus status = NodeStatus.inServiceHealthy();
+    map.addNode(dn, status);
+
+    NodeStatus expectedStatus = NodeStatus.inServiceStale();
+    NodeStatus returnedStatus =
+        map.updateNodeHealthState(dn.getUuid(), expectedStatus.getHealth());
+    assertEquals(expectedStatus, returnedStatus);
+    assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid()));
+  }
+
+  @Test
+  public void testNodeOperationalStateCanBeUpdated()
+      throws NodeAlreadyExistsException, NodeNotFoundException {
+    DatanodeDetails dn = generateDatanode();
+    NodeStatus status = NodeStatus.inServiceHealthy();
+    map.addNode(dn, status);
+
+    NodeStatus expectedStatus = new NodeStatus(
+        NodeOperationalState.DECOMMISSIONING,
+        NodeState.HEALTHY);
+    NodeStatus returnedStatus = map.updateNodeOperationalState(
+        dn.getUuid(), expectedStatus.getOperationalState());
+    assertEquals(expectedStatus, returnedStatus);
+    assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid()));
+  }
+
+  @Test
+  public void testGetNodeMethodsReturnCorrectCountsAndStates()
+      throws NodeAlreadyExistsException {
+    // Add one node for all possible states
+    int nodeCount = 0;
+    for(NodeOperationalState op : NodeOperationalState.values()) {
+      for(NodeState health : NodeState.values()) {
+        addRandomNodeWithState(op, health);
+        nodeCount++;
+      }
+    }
+    NodeStatus requestedState = NodeStatus.inServiceStale();
+    List<UUID> nodes = map.getNodes(requestedState);
+    assertEquals(1, nodes.size());
+    assertEquals(1, map.getNodeCount(requestedState));
+    assertEquals(nodeCount, map.getTotalNodeCount());
+    assertEquals(nodeCount, map.getAllNodes().size());
+    assertEquals(nodeCount, map.getAllDatanodeInfos().size());
+
+    // Checks for the getNodeCount(opstate, health) method
+    assertEquals(nodeCount, map.getNodeCount(null, null));
+    assertEquals(1,
+        map.getNodeCount(NodeOperationalState.DECOMMISSIONING,
+            NodeState.STALE));
+    assertEquals(5, map.getNodeCount(null, NodeState.HEALTHY));
+    assertEquals(3,
+        map.getNodeCount(NodeOperationalState.DECOMMISSIONING, null));
+  }
+
+  private void addNodeWithState(DatanodeDetails dn,
+      NodeOperationalState opState, NodeState health)
+      throws NodeAlreadyExistsException {
+    NodeStatus status = new NodeStatus(opState, health);
+    map.addNode(dn, status);
+  }
+
+  private void addRandomNodeWithState(
+      NodeOperationalState opState, NodeState health)
+      throws NodeAlreadyExistsException {
+    DatanodeDetails dn = generateDatanode();
+    addNodeWithState(dn, opState, health);
+  }
+
+  private DatanodeDetails generateDatanode() {
+    String uuid = UUID.randomUUID().toString();
+    return DatanodeDetails.newBuilder().setUuid(uuid).build();
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
index f0b1cbb..2ac5f70 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.hdds.scm.container.placement.algorithms
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.junit.Assert;
 import org.junit.Test;
@@ -34,8 +35,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
-    .HEALTHY;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -45,7 +44,8 @@ public class TestContainerPlacement {
 
   private DescriptiveStatistics computeStatistics(NodeManager nodeManager) {
     DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
-    for (DatanodeDetails dd : nodeManager.getNodes(HEALTHY)) {
+    for (DatanodeDetails dd :
+        nodeManager.getNodes(NodeStatus.inServiceHealthy())) {
       float weightedValue =
           nodeManager.getNodeStat(dd).get().getScmUsed().get() / (float)
               nodeManager.getNodeStat(dd).get().getCapacity().get();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 30a75ef..b584f3f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -17,9 +17,11 @@
 package org.apache.hadoop.ozone.container.testutils;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -81,22 +83,48 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
    *
-   * @param nodestate - State of the node
+   * @param nodestatus - State of the node
    * @return List of Datanodes that are Heartbeating SCM.
    */
   @Override
-  public List<DatanodeDetails> getNodes(NodeState nodestate) {
+  public List<DatanodeDetails> getNodes(NodeStatus nodestatus) {
     return null;
   }
 
   /**
+   * Gets all Live Datanodes that is currently communicating with SCM.
+   *
+   * @param opState - Operational state of the node
+   * @param health - Health of the node
+   * @return List of Datanodes that are Heartbeating SCM.
+   */
+  @Override
+  public List<DatanodeDetails> getNodes(
+      HddsProtos.NodeOperationalState opState, NodeState health) {
+    return null;
+  }
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   *
+   * @param nodestatus - State of the node
+   * @return int -- count
+   */
+  @Override
+  public int getNodeCount(NodeStatus nodestatus) {
+    return 0;
+  }
+
+  /**
    * Returns the Number of Datanodes that are communicating with SCM.
    *
-   * @param nodestate - State of the node
+   * @param opState - Operational state of the node
+   * @param health - Health of the node
    * @return int -- count
    */
   @Override
-  public int getNodeCount(NodeState nodestate) {
+  public int getNodeCount(
+      HddsProtos.NodeOperationalState opState, NodeState health) {
     return 0;
   }
 
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java
index 7de2e4b..42773f8 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.hdds.scm.client.ScmClient;
 import picocli.CommandLine;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
 
@@ -57,8 +55,6 @@ public class TopologySubcommand implements Callable<Void> {
     stateArray.add(HEALTHY);
     stateArray.add(STALE);
     stateArray.add(DEAD);
-    stateArray.add(DECOMMISSIONING);
-    stateArray.add(DECOMMISSIONED);
   }
 
   @CommandLine.Option(names = {"-o", "--order"},
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index ba072f8..225281a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -66,6 +65,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -366,8 +366,7 @@ public class TestStorageContainerManager {
         NodeManager nodeManager = cluster.getStorageContainerManager()
             .getScmNodeManager();
         List<SCMCommand> commands = nodeManager.processHeartbeat(
-            nodeManager.getNodes(NodeState.HEALTHY).get(0));
-
+            nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0));
         if (commands != null) {
           for (SCMCommand cmd : commands) {
             if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
index 65a6357..98a22a2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -149,10 +149,6 @@ public class TestSCMNodeMetrics {
         getMetrics(SCMNodeMetrics.class.getSimpleName()));
     assertGauge("DeadNodes", 0,
         getMetrics(SCMNodeMetrics.class.getSimpleName()));
-    assertGauge("DecommissioningNodes", 0,
-        getMetrics(SCMNodeMetrics.class.getSimpleName()));
-    assertGauge("DecommissionedNodes", 0,
-        getMetrics(SCMNodeMetrics.class.getSimpleName()));
     assertGauge("DiskCapacity", 100L,
         getMetrics(SCMNodeMetrics.class.getSimpleName()));
     assertGauge("DiskUsed", 10L,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to