This is an automated email from the ASF dual-hosted git repository. aengineer pushed a commit to branch HDDS-1880-Decom in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push: new ee8f24c HDDS-1982. Extend SCMNodeManager to support decommission and maintenance states. Contributed by Stephen O'Donnell. ee8f24c is described below commit ee8f24ca4fcf364b12ae782beed790c7653bdafc Author: Anu Engineer <aengin...@apache.org> AuthorDate: Fri Sep 20 11:42:21 2019 -0700 HDDS-1982. Extend SCMNodeManager to support decommission and maintenance states. Contributed by Stephen O'Donnell. --- hadoop-hdds/common/src/main/proto/hdds.proto | 10 +- .../hdds/scm/block/SCMBlockDeletingService.java | 7 +- .../placement/algorithms/SCMCommonPolicy.java | 4 +- .../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 50 +++- .../apache/hadoop/hdds/scm/node/NodeManager.java | 31 ++- .../hadoop/hdds/scm/node/NodeStateManager.java | 291 +++++++++++---------- .../apache/hadoop/hdds/scm/node/NodeStatus.java | 93 +++++++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 59 ++++- .../hadoop/hdds/scm/node/SCMNodeMetrics.java | 8 - .../hadoop/hdds/scm/node/states/NodeStateMap.java | 241 ++++++++++++----- .../hdds/scm/pipeline/RatisPipelineProvider.java | 4 +- .../hdds/scm/pipeline/SimplePipelineProvider.java | 4 +- .../hdds/scm/server/SCMClientProtocolServer.java | 4 +- .../hdds/scm/server/StorageContainerManager.java | 3 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 3 +- .../hadoop/hdds/scm/container/MockNodeManager.java | 37 ++- .../scm/container/TestContainerReportHandler.java | 12 +- .../algorithms/TestContainerPlacementFactory.java | 4 +- .../TestSCMContainerPlacementCapacity.java | 4 +- .../TestSCMContainerPlacementRackAware.java | 4 +- .../TestSCMContainerPlacementRandom.java | 4 +- .../hdds/scm/node/TestContainerPlacement.java | 2 +- .../hadoop/hdds/scm/node/TestNodeStateManager.java | 223 ++++++++++++++++ .../hadoop/hdds/scm/node/TestSCMNodeManager.java | 83 +++--- .../hdds/scm/node/states/TestNodeStateMap.java | 140 ++++++++++ .../placement/TestContainerPlacement.java | 6 +- .../testutils/ReplicationNodeManagerMock.java | 36 ++- .../hadoop/hdds/scm/cli/TopologySubcommand.java | 4 - .../hadoop/ozone/TestStorageContainerManager.java | 5 +- .../hadoop/ozone/scm/node/TestSCMNodeMetrics.java | 4 - 30 files changed, 1063 insertions(+), 317 deletions(-) diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index d2bb355..294f2b7 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -100,8 +100,14 @@ enum NodeState { HEALTHY = 1; STALE = 2; DEAD = 3; - DECOMMISSIONING = 4; - DECOMMISSIONED = 5; +} + +enum NodeOperationalState { + IN_SERVICE = 1; + DECOMMISSIONING = 2; + DECOMMISSIONED = 3; + ENTERING_MAINTENANCE = 4; + IN_MAINTENANCE = 5; } enum QueryScope { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index ad77624..b5e5d16 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -23,9 +23,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; @@ -137,7 +137,10 @@ public class SCMBlockDeletingService extends BackgroundService { // to delete blocks. LOG.debug("Running DeletedBlockTransactionScanner"); DatanodeDeletedBlockTransactions transactions = null; - List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + // TODO - DECOMM - should we be deleting blocks from decom nodes + // and what about entering maintenance. + List<DatanodeDetails> datanodes = + nodeManager.getNodes(NodeStatus.inServiceHealthy()); Map<Long, Long> transactionMap = null; if (datanodes != null) { transactions = new DatanodeDeletedBlockTransactions(containerManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java index 77cdd83..63cd8e1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +108,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes, int nodesRequired, final long sizeRequired) throws SCMException { List<DatanodeDetails> healthyNodes = - nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + nodeManager.getNodes(NodeStatus.inServiceHealthy()); if (excludedNodes != null) { healthyNodes.removeAll(excludedNodes); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index d06ea2a..66e1ca4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.node; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; @@ -41,25 +42,39 @@ public class DatanodeInfo extends DatanodeDetails { private List<StorageReportProto> storageReports; + private NodeStatus nodeStatus; + /** * Constructs DatanodeInfo from DatanodeDetails. * * @param datanodeDetails Details about the datanode */ - public DatanodeInfo(DatanodeDetails datanodeDetails) { + public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus) { super(datanodeDetails); this.lock = new ReentrantReadWriteLock(); this.lastHeartbeatTime = Time.monotonicNow(); this.storageReports = Collections.emptyList(); + this.nodeStatus = nodeStatus; } /** * Updates the last heartbeat time with current time. */ public void updateLastHeartbeatTime() { + updateLastHeartbeatTime(Time.monotonicNow()); + } + + /** + * Sets the last heartbeat time to a given value. Intended to be used + * only for tests. + * + * @param milliSecondsSinceEpoch - ms since Epoch to set as the heartbeat time + */ + @VisibleForTesting + public void updateLastHeartbeatTime(long milliSecondsSinceEpoch) { try { lock.writeLock().lock(); - lastHeartbeatTime = Time.monotonicNow(); + lastHeartbeatTime = milliSecondsSinceEpoch; } finally { lock.writeLock().unlock(); } @@ -109,6 +124,37 @@ public class DatanodeInfo extends DatanodeDetails { } /** + * Return the current NodeStatus for the datanode. + * + * @return NodeStatus - the current nodeStatus + */ + public NodeStatus getNodeStatus() { + try { + lock.readLock().lock(); + return nodeStatus; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Update the NodeStatus for this datanode. When using this method + * be ware of the potential for lost updates if two threads read the + * current status, update one field and then write it back without + * locking enforced outside of this class. + * + * @param newNodeStatus - the new NodeStatus object + */ + public void setNodeStatus(NodeStatus newNodeStatus) { + try { + lock.writeLock().lock(); + this.nodeStatus = newNodeStatus; + } finally { + lock.writeLock().unlock(); + } + } + + /** * Returns the last updated time of datanode info. * @return the last updated time of datanode info. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index d8890fb..afff7a3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -63,18 +64,38 @@ public interface NodeManager extends StorageContainerNodeProtocol, EventHandler<CommandForDatanode>, NodeManagerMXBean, Closeable { /** + * Gets all Live Datanodes that are currently communicating with SCM. + * @param nodeStatus - Status of the node to return + * @return List of Datanodes that are Heartbeating SCM. + */ + List<DatanodeDetails> getNodes(NodeStatus nodeStatus); + + /** * Gets all Live Datanodes that is currently communicating with SCM. - * @param nodeState - State of the node + * @param opState - The operational state of the node + * @param health - The health of the node * @return List of Datanodes that are Heartbeating SCM. */ - List<DatanodeDetails> getNodes(NodeState nodeState); + List<DatanodeDetails> getNodes( + NodeOperationalState opState, NodeState health); + + /** + * Returns the Number of Datanodes that are communicating with SCM with the + * given status. + * @param nodeStatus - State of the node + * @return int -- count + */ + int getNodeCount(NodeStatus nodeStatus); /** - * Returns the Number of Datanodes that are communicating with SCM. - * @param nodeState - State of the node + * Returns the Number of Datanodes that are communicating with SCM in the + * given state. + * @param opState - The operational state of the node + * @param health - The health of the node * @return int -- count */ - int getNodeCount(NodeState nodeState); + int getNodeCount( + NodeOperationalState opState, NodeState health); /** * Get all datanodes known to SCM. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 954cb0e..1e1a50c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -23,7 +23,9 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -75,7 +77,12 @@ public class NodeStateManager implements Runnable, Closeable { * Node's life cycle events. */ private enum NodeLifeCycleEvent { - TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED + TIMEOUT, RESTORE, RESURRECT + } + + private enum NodeOperationStateEvent { + START_DECOMMISSION, COMPLETE_DECOMMISSION, START_MAINTENANCE, + ENTER_MAINTENANCE, RETURN_TO_SERVICE } private static final Logger LOG = LoggerFactory @@ -84,7 +91,12 @@ public class NodeStateManager implements Runnable, Closeable { /** * StateMachine for node lifecycle. */ - private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine; + private final StateMachine<NodeState, NodeLifeCycleEvent> nodeHealthSM; + /** + * StateMachine for node operational state. + */ + private final StateMachine<HddsProtos.NodeOperationalState, + NodeOperationStateEvent> nodeOpStateSM; /** * This is the map which maintains the current state of all datanodes. */ @@ -100,7 +112,7 @@ public class NodeStateManager implements Runnable, Closeable { /** * Maps the event to be triggered when a node state us updated. */ - private final Map<NodeState, Event<DatanodeDetails>> state2EventMap; + private final Map<NodeStatus, Event<DatanodeDetails>> state2EventMap; /** * ExecutorService used for scheduling heartbeat processing thread. */ @@ -150,9 +162,11 @@ public class NodeStateManager implements Runnable, Closeable { this.state2EventMap = new HashMap<>(); initialiseState2EventMap(); Set<NodeState> finalStates = new HashSet<>(); - finalStates.add(NodeState.DECOMMISSIONED); - this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates); - initializeStateMachine(); + Set<HddsProtos.NodeOperationalState> opStateFinalStates = new HashSet<>(); + this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, finalStates); + this.nodeOpStateSM = new StateMachine<>( + NodeOperationalState.IN_SERVICE, opStateFinalStates); + initializeStateMachines(); heartbeatCheckerIntervalMs = HddsServerUtil .getScmheartbeatCheckerInterval(conf); staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); @@ -176,10 +190,12 @@ public class NodeStateManager implements Runnable, Closeable { * Populates state2event map. */ private void initialiseState2EventMap() { - state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE); - state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE); - state2EventMap - .put(NodeState.HEALTHY, SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE); + state2EventMap.put(NodeStatus.inServiceStale(), SCMEvents.STALE_NODE); + state2EventMap.put(NodeStatus.inServiceDead(), SCMEvents.DEAD_NODE); + state2EventMap.put(NodeStatus.inServiceHealthy(), + SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE); + // TODO - add whatever events are needed for decomm / maint to stale, dead, + // healthy } /* @@ -198,18 +214,6 @@ public class NodeStateManager implements Runnable, Closeable { * State: DEAD -------------------> HEALTHY * Event: RESURRECT * - * State: HEALTHY -------------------> DECOMMISSIONING - * Event: DECOMMISSION - * - * State: STALE -------------------> DECOMMISSIONING - * Event: DECOMMISSION - * - * State: DEAD -------------------> DECOMMISSIONING - * Event: DECOMMISSION - * - * State: DECOMMISSIONING -------------------> DECOMMISSIONED - * Event: DECOMMISSIONED - * * Node State Flow * * +--------------------------------------------------------+ @@ -219,47 +223,51 @@ public class NodeStateManager implements Runnable, Closeable { * | | | | * V V | | * [HEALTHY]------------------->[STALE]------------------->[DEAD] - * | (TIMEOUT) | (TIMEOUT) | - * | | | - * | | | - * | | | - * | | | - * | (DECOMMISSION) | (DECOMMISSION) | (DECOMMISSION) - * | V | - * +------------------->[DECOMMISSIONING]<----------------+ - * | - * | (DECOMMISSIONED) - * | - * V - * [DECOMMISSIONED] * */ /** * Initializes the lifecycle of node state machine. */ - private void initializeStateMachine() { - stateMachine.addTransition( + private void initializeStateMachines() { + nodeHealthSM.addTransition( NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT); - stateMachine.addTransition( + nodeHealthSM.addTransition( NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT); - stateMachine.addTransition( + nodeHealthSM.addTransition( NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE); - stateMachine.addTransition( + nodeHealthSM.addTransition( NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT); - stateMachine.addTransition( - NodeState.HEALTHY, NodeState.DECOMMISSIONING, - NodeLifeCycleEvent.DECOMMISSION); - stateMachine.addTransition( - NodeState.STALE, NodeState.DECOMMISSIONING, - NodeLifeCycleEvent.DECOMMISSION); - stateMachine.addTransition( - NodeState.DEAD, NodeState.DECOMMISSIONING, - NodeLifeCycleEvent.DECOMMISSION); - stateMachine.addTransition( - NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, - NodeLifeCycleEvent.DECOMMISSIONED); + nodeOpStateSM.addTransition( + NodeOperationalState.IN_SERVICE, NodeOperationalState.DECOMMISSIONING, + NodeOperationStateEvent.START_DECOMMISSION); + nodeOpStateSM.addTransition( + NodeOperationalState.DECOMMISSIONING, NodeOperationalState.IN_SERVICE, + NodeOperationStateEvent.RETURN_TO_SERVICE); + nodeOpStateSM.addTransition( + NodeOperationalState.DECOMMISSIONING, + NodeOperationalState.DECOMMISSIONED, + NodeOperationStateEvent.COMPLETE_DECOMMISSION); + nodeOpStateSM.addTransition( + NodeOperationalState.DECOMMISSIONED, NodeOperationalState.IN_SERVICE, + NodeOperationStateEvent.RETURN_TO_SERVICE); + + nodeOpStateSM.addTransition( + NodeOperationalState.IN_SERVICE, + NodeOperationalState.ENTERING_MAINTENANCE, + NodeOperationStateEvent.START_MAINTENANCE); + nodeOpStateSM.addTransition( + NodeOperationalState.ENTERING_MAINTENANCE, + NodeOperationalState.IN_SERVICE, + NodeOperationStateEvent.RETURN_TO_SERVICE); + nodeOpStateSM.addTransition( + NodeOperationalState.ENTERING_MAINTENANCE, + NodeOperationalState.IN_MAINTENANCE, + NodeOperationStateEvent.ENTER_MAINTENANCE); + nodeOpStateSM.addTransition( + NodeOperationalState.IN_MAINTENANCE, NodeOperationalState.IN_SERVICE, + NodeOperationStateEvent.RETURN_TO_SERVICE); } /** @@ -271,7 +279,8 @@ public class NodeStateManager implements Runnable, Closeable { */ public void addNode(DatanodeDetails datanodeDetails) throws NodeAlreadyExistsException { - nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState()); + nodeStateMap.addNode(datanodeDetails, new NodeStatus( + nodeOpStateSM.getInitialState(), nodeHealthSM.getInitialState())); eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails); } @@ -317,59 +326,63 @@ public class NodeStateManager implements Runnable, Closeable { * * @throws NodeNotFoundException if the node is not present */ - public NodeState getNodeState(DatanodeDetails datanodeDetails) + public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) throws NodeNotFoundException { - return nodeStateMap.getNodeState(datanodeDetails.getUuid()); + return nodeStateMap.getNodeStatus(datanodeDetails.getUuid()); } /** - * Returns all the node which are in healthy state. + * Returns all the node which are in healthy state, ignoring the operational + * state. * * @return list of healthy nodes */ public List<DatanodeInfo> getHealthyNodes() { - return getNodes(NodeState.HEALTHY); + return getNodes(null, NodeState.HEALTHY); } /** - * Returns all the node which are in stale state. + * Returns all the node which are in stale state, ignoring the operational + * state. * * @return list of stale nodes */ public List<DatanodeInfo> getStaleNodes() { - return getNodes(NodeState.STALE); + return getNodes(null, NodeState.STALE); } /** - * Returns all the node which are in dead state. + * Returns all the node which are in dead state, ignoring the operational + * state. * * @return list of dead nodes */ public List<DatanodeInfo> getDeadNodes() { - return getNodes(NodeState.DEAD); + return getNodes(null, NodeState.DEAD); } /** - * Returns all the node which are in the specified state. + * Returns all the nodes with the specified status. * - * @param state NodeState + * @param status NodeStatus * * @return list of nodes */ - public List<DatanodeInfo> getNodes(NodeState state) { - List<DatanodeInfo> nodes = new ArrayList<>(); - nodeStateMap.getNodes(state).forEach( - uuid -> { - try { - nodes.add(nodeStateMap.getNodeInfo(uuid)); - } catch (NodeNotFoundException e) { - // This should not happen unless someone else other than - // NodeStateManager is directly modifying NodeStateMap and removed - // the node entry after we got the list of UUIDs. - LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); - } - }); - return nodes; + public List<DatanodeInfo> getNodes(NodeStatus status) { + return nodeStateMap.getDatanodeInfos(status); + } + + /** + * Returns all the nodes with the specified operationalState and health. + * + * @param opState The operationalState of the node + * @param health The node health + * + * @return list of nodes matching the passed states + */ + public List<DatanodeInfo> getNodes( + NodeOperationalState opState, NodeState health) { + return nodeStateMap.getDatanodeInfos(opState, health); } /** @@ -378,19 +391,7 @@ public class NodeStateManager implements Runnable, Closeable { * @return all the managed nodes */ public List<DatanodeInfo> getAllNodes() { - List<DatanodeInfo> nodes = new ArrayList<>(); - nodeStateMap.getAllNodes().forEach( - uuid -> { - try { - nodes.add(nodeStateMap.getNodeInfo(uuid)); - } catch (NodeNotFoundException e) { - // This should not happen unless someone else other than - // NodeStateManager is directly modifying NodeStateMap and removed - // the node entry after we got the list of UUIDs. - LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); - } - }); - return nodes; + return nodeStateMap.getAllDatanodeInfos(); } /** @@ -403,41 +404,53 @@ public class NodeStateManager implements Runnable, Closeable { } /** - * Returns the count of healthy nodes. + * Returns the count of healthy nodes, ignoring operational state. * * @return healthy node count */ public int getHealthyNodeCount() { - return getNodeCount(NodeState.HEALTHY); + return getHealthyNodes().size(); } /** - * Returns the count of stale nodes. + * Returns the count of stale nodes, ignoring operational state. * * @return stale node count */ public int getStaleNodeCount() { - return getNodeCount(NodeState.STALE); + return getStaleNodes().size(); } /** - * Returns the count of dead nodes. + * Returns the count of dead nodes, ignoring operational state. * * @return dead node count */ public int getDeadNodeCount() { - return getNodeCount(NodeState.DEAD); + return getDeadNodes().size(); } /** - * Returns the count of nodes in specified state. + * Returns the count of nodes in specified status. * - * @param state NodeState + * @param status NodeState * * @return node count */ - public int getNodeCount(NodeState state) { - return nodeStateMap.getNodeCount(state); + public int getNodeCount(NodeStatus status) { + return nodeStateMap.getNodeCount(status); + } + + /** + * Returns the count of nodes in the specified states. + * + * @param opState The operational state of the node + * @param health The health of the node + * + * @return node count + */ + public int getNodeCount(NodeOperationalState opState, NodeState health) { + return nodeStateMap.getNodeCount(opState, health); } /** @@ -536,7 +549,8 @@ public class NodeStateManager implements Runnable, Closeable { scheduleNextHealthCheck(); } - private void checkNodesHealth() { + @VisibleForTesting + public void checkNodesHealth() { /* * @@ -578,39 +592,32 @@ public class NodeStateManager implements Runnable, Closeable { Predicate<Long> deadNodeCondition = (lastHbTime) -> lastHbTime < staleNodeDeadline; try { - for (NodeState state : NodeState.values()) { - List<UUID> nodes = nodeStateMap.getNodes(state); - for (UUID id : nodes) { - DatanodeInfo node = nodeStateMap.getNodeInfo(id); - switch (state) { - case HEALTHY: - // Move the node to STALE if the last heartbeat time is less than - // configured stale-node interval. - updateNodeState(node, staleNodeCondition, state, - NodeLifeCycleEvent.TIMEOUT); - break; - case STALE: - // Move the node to DEAD if the last heartbeat time is less than - // configured dead-node interval. - updateNodeState(node, deadNodeCondition, state, - NodeLifeCycleEvent.TIMEOUT); - // Restore the node if we have received heartbeat before configured - // stale-node interval. - updateNodeState(node, healthyNodeCondition, state, - NodeLifeCycleEvent.RESTORE); - break; - case DEAD: - // Resurrect the node if we have received heartbeat before - // configured stale-node interval. - updateNodeState(node, healthyNodeCondition, state, - NodeLifeCycleEvent.RESURRECT); - break; - // We don't do anything for DECOMMISSIONING and DECOMMISSIONED in - // heartbeat processing. - case DECOMMISSIONING: - case DECOMMISSIONED: - default: - } + for(DatanodeInfo node : nodeStateMap.getAllDatanodeInfos()) { + NodeStatus status = nodeStateMap.getNodeStatus(node.getUuid()); + switch (status.getHealth()) { + case HEALTHY: + // Move the node to STALE if the last heartbeat time is less than + // configured stale-node interval. + updateNodeState(node, staleNodeCondition, status, + NodeLifeCycleEvent.TIMEOUT); + break; + case STALE: + // Move the node to DEAD if the last heartbeat time is less than + // configured dead-node interval. + updateNodeState(node, deadNodeCondition, status, + NodeLifeCycleEvent.TIMEOUT); + // Restore the node if we have received heartbeat before configured + // stale-node interval. + updateNodeState(node, healthyNodeCondition, status, + NodeLifeCycleEvent.RESTORE); + break; + case DEAD: + // Resurrect the node if we have received heartbeat before + // configured stale-node interval. + updateNodeState(node, healthyNodeCondition, status, + NodeLifeCycleEvent.RESURRECT); + break; + default: } } } catch (NodeNotFoundException e) { @@ -669,27 +676,29 @@ public class NodeStateManager implements Runnable, Closeable { * * @param node DatanodeInfo * @param condition condition to check - * @param state current state of node + * @param status current status of node * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition * matches * * @throws NodeNotFoundException if the node is not present */ private void updateNodeState(DatanodeInfo node, Predicate<Long> condition, - NodeState state, NodeLifeCycleEvent lifeCycleEvent) + NodeStatus status, NodeLifeCycleEvent lifeCycleEvent) throws NodeNotFoundException { try { if (condition.test(node.getLastHeartbeatTime())) { - NodeState newState = stateMachine.getNextState(state, lifeCycleEvent); - nodeStateMap.updateNodeState(node.getUuid(), state, newState); - if (state2EventMap.containsKey(newState)) { - eventPublisher.fireEvent(state2EventMap.get(newState), node); + NodeState newHealthState = nodeHealthSM. + getNextState(status.getHealth(), lifeCycleEvent); + NodeStatus newStatus = + nodeStateMap.updateNodeHealthState(node.getUuid(), newHealthState); + if (state2EventMap.containsKey(newStatus)) { + eventPublisher.fireEvent(state2EventMap.get(newStatus), node); } } } catch (InvalidStateTransitionException e) { LOG.warn("Invalid state transition of node {}." + " Current state: {}, life cycle event: {}", - node, state, lifeCycleEvent); + node, status.getHealth(), lifeCycleEvent); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java new file mode 100644 index 0000000..0776c28 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.util.Objects; + +/** + * This class is used to capture the current status of a datanode. This + * includes its health (healthy, stale or dead) and its operation status ( + * in_service, decommissioned and maintenance mode. + */ +public class NodeStatus { + + private HddsProtos.NodeOperationalState operationalState; + private HddsProtos.NodeState health; + + public NodeStatus(HddsProtos.NodeOperationalState operationalState, + HddsProtos.NodeState health) { + this.operationalState = operationalState; + this.health = health; + } + + public static NodeStatus inServiceHealthy() { + return new NodeStatus(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeState.HEALTHY); + } + + public static NodeStatus inServiceStale() { + return new NodeStatus(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeState.STALE); + } + + public static NodeStatus inServiceDead() { + return new NodeStatus(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeState.DEAD); + } + + public HddsProtos.NodeState getHealth() { + return health; + } + + public HddsProtos.NodeOperationalState getOperationalState() { + return operationalState; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + NodeStatus other = (NodeStatus) obj; + if (this.operationalState == other.operationalState && + this.health == other.health) { + return true; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(health, operationalState); + } + + @Override + public String toString() { + return "OperationalState: "+operationalState+" Health: "+health; + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index d3df858..c277ea9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -150,8 +151,28 @@ public class SCMNodeManager implements NodeManager { * @return List of Datanodes that are known to SCM in the requested state. */ @Override - public List<DatanodeDetails> getNodes(NodeState nodestate) { - return nodeStateManager.getNodes(nodestate).stream() + public List<DatanodeDetails> getNodes(NodeStatus nodeStatus) { + return nodeStateManager.getNodes(nodeStatus) + .stream() + .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); + } + + /** + * Returns all datanode that are in the given states. Passing null for one of + * of the states acts like a wildcard for that state. This function works by + * taking a snapshot of the current collection and then returning the list + * from that collection. This means that real map might have changed by the + * time we return this list. + * + * @param opState The operational state of the node + * @param health The health of the node + * @return List of Datanodes that are known to SCM in the requested states. + */ + @Override + public List<DatanodeDetails> getNodes( + NodeOperationalState opState, NodeState health) { + return nodeStateManager.getNodes(opState, health) + .stream() .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); } @@ -172,8 +193,21 @@ public class SCMNodeManager implements NodeManager { * @return count */ @Override - public int getNodeCount(NodeState nodestate) { - return nodeStateManager.getNodeCount(nodestate); + public int getNodeCount(NodeStatus nodeStatus) { + return nodeStateManager.getNodeCount(nodeStatus); + } + + /** + * Returns the Number of Datanodes by State they are in. Passing null for + * either of the states acts like a wildcard for that state. + * + * @parem nodeOpState - The Operational State of the node + * @param health - The health of the node + * @return count + */ + @Override + public int getNodeCount(NodeOperationalState nodeOpState, NodeState health) { + return nodeStateManager.getNodeCount(nodeOpState, health); } /** @@ -185,7 +219,7 @@ public class SCMNodeManager implements NodeManager { @Override public NodeState getNodeState(DatanodeDetails datanodeDetails) { try { - return nodeStateManager.getNodeState(datanodeDetails); + return nodeStateManager.getNodeStatus(datanodeDetails).getHealth(); } catch (NodeNotFoundException e) { // TODO: should we throw NodeNotFoundException? return null; @@ -365,9 +399,9 @@ public class SCMNodeManager implements NodeManager { final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>(); final List<DatanodeInfo> healthyNodes = nodeStateManager - .getNodes(NodeState.HEALTHY); + .getHealthyNodes(); final List<DatanodeInfo> staleNodes = nodeStateManager - .getNodes(NodeState.STALE); + .getStaleNodes(); final List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes); datanodes.addAll(staleNodes); @@ -417,9 +451,12 @@ public class SCMNodeManager implements NodeManager { @Override public Map<String, Integer> getNodeCount() { + // TODO - This does not consider decom, maint etc. Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); for(NodeState state : NodeState.values()) { - nodeCountMap.put(state.toString(), getNodeCount(state)); + // TODO - this iterate the node list once per state and needs + // fixed to only perform one pass. + nodeCountMap.put(state.toString(), getNodeCount(null, state)); } return nodeCountMap; } @@ -436,10 +473,8 @@ public class SCMNodeManager implements NodeManager { long ssdUsed = 0L; long ssdRemaining = 0L; - List<DatanodeInfo> healthyNodes = nodeStateManager - .getNodes(NodeState.HEALTHY); - List<DatanodeInfo> staleNodes = nodeStateManager - .getNodes(NodeState.STALE); + List<DatanodeInfo> healthyNodes = nodeStateManager.getHealthyNodes(); + List<DatanodeInfo> staleNodes = nodeStateManager.getStaleNodes(); List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes); datanodes.addAll(staleNodes); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java index 1596523..fd9c9c1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hdds.scm.node; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; @@ -132,12 +130,6 @@ public final class SCMNodeMetrics implements MetricsSource { .addGauge(Interns.info("DeadNodes", "Number of dead datanodes"), nodeCount.get(DEAD.toString())) - .addGauge(Interns.info("DecommissioningNodes", - "Number of decommissioning datanodes"), - nodeCount.get(DECOMMISSIONING.toString())) - .addGauge(Interns.info("DecommissionedNodes", - "Number of decommissioned datanodes"), - nodeCount.get(DECOMMISSIONED.toString())) .addGauge(Interns.info("DiskCapacity", "Total disk capacity"), nodeInfo.get("DISKCapacity")) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java index 0c1ab2c..6565e81 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java @@ -20,13 +20,16 @@ package org.apache.hadoop.hdds.scm.node.states; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * Maintains the state of datanodes in SCM. This class should only be used by @@ -35,16 +38,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * this class. */ public class NodeStateMap { - /** * Node id to node info map. */ private final ConcurrentHashMap<UUID, DatanodeInfo> nodeMap; /** - * Represents the current state of node. - */ - private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap; - /** * Node to set of containers on the node. */ private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer; @@ -57,29 +55,18 @@ public class NodeStateMap { public NodeStateMap() { lock = new ReentrantReadWriteLock(); nodeMap = new ConcurrentHashMap<>(); - stateMap = new ConcurrentHashMap<>(); nodeToContainer = new ConcurrentHashMap<>(); - initStateMap(); - } - - /** - * Initializes the state map with available states. - */ - private void initStateMap() { - for (NodeState state : NodeState.values()) { - stateMap.put(state, new HashSet<>()); - } } /** * Adds a node to NodeStateMap. * * @param datanodeDetails DatanodeDetails - * @param nodeState initial NodeState + * @param nodeStatus initial NodeStatus * * @throws NodeAlreadyExistsException if the node already exist */ - public void addNode(DatanodeDetails datanodeDetails, NodeState nodeState) + public void addNode(DatanodeDetails datanodeDetails, NodeStatus nodeStatus) throws NodeAlreadyExistsException { lock.writeLock().lock(); try { @@ -87,34 +74,54 @@ public class NodeStateMap { if (nodeMap.containsKey(id)) { throw new NodeAlreadyExistsException("Node UUID: " + id); } - nodeMap.put(id, new DatanodeInfo(datanodeDetails)); + nodeMap.put(id, new DatanodeInfo(datanodeDetails, nodeStatus)); nodeToContainer.put(id, Collections.emptySet()); - stateMap.get(nodeState).add(id); } finally { lock.writeLock().unlock(); } } /** - * Updates the node state. + * Updates the node health state. * * @param nodeId Node Id - * @param currentState current state - * @param newState new state + * @param newHealth new health state * * @throws NodeNotFoundException if the node is not present */ - public void updateNodeState(UUID nodeId, NodeState currentState, - NodeState newState)throws NodeNotFoundException { - lock.writeLock().lock(); + public NodeStatus updateNodeHealthState(UUID nodeId, NodeState newHealth) + throws NodeNotFoundException { try { - checkIfNodeExist(nodeId); - if (stateMap.get(currentState).remove(nodeId)) { - stateMap.get(newState).add(nodeId); - } else { - throw new NodeNotFoundException("Node UUID: " + nodeId + - ", not found in state: " + currentState); - } + lock.writeLock().lock(); + DatanodeInfo dn = getNodeInfo(nodeId); + NodeStatus oldStatus = dn.getNodeStatus(); + NodeStatus newStatus = new NodeStatus( + oldStatus.getOperationalState(), newHealth); + dn.setNodeStatus(newStatus); + return newStatus; + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Updates the node operational state. + * + * @param nodeId Node Id + * @param newOpState new operational state + * + * @throws NodeNotFoundException if the node is not present + */ + public NodeStatus updateNodeOperationalState(UUID nodeId, + NodeOperationalState newOpState) throws NodeNotFoundException { + try { + lock.writeLock().lock(); + DatanodeInfo dn = getNodeInfo(nodeId); + NodeStatus oldStatus = dn.getNodeStatus(); + NodeStatus newStatus = new NodeStatus( + newOpState, oldStatus.getHealth()); + dn.setNodeStatus(newStatus); + return newStatus; } finally { lock.writeLock().unlock(); } @@ -139,21 +146,38 @@ public class NodeStateMap { } } - /** * Returns the list of node ids which are in the specified state. * - * @param state NodeState + * @param status NodeStatus * * @return list of node ids */ - public List<UUID> getNodes(NodeState state) { - lock.readLock().lock(); - try { - return new ArrayList<>(stateMap.get(state)); - } finally { - lock.readLock().unlock(); + public List<UUID> getNodes(NodeStatus status) { + ArrayList<UUID> nodes = new ArrayList<>(); + for (DatanodeInfo dn : filterNodes(status)) { + nodes.add(dn.getUuid()); + } + return nodes; + } + + /** + * Returns the list of node ids which match the desired operational state + * and health. Passing a null for either value is equivalent to a wild card. + * + * Therefore, passing opState = null, health=stale will return all stale nodes + * regardless of their operational state. + * + * @param opState + * @param health + * @return The list of nodes matching the given states + */ + public List<UUID> getNodes(NodeOperationalState opState, NodeState health) { + ArrayList<UUID> nodes = new ArrayList<>(); + for (DatanodeInfo dn : filterNodes(opState, health)) { + nodes.add(dn.getUuid()); } + return nodes; } /** @@ -162,8 +186,8 @@ public class NodeStateMap { * @return list of all the node ids */ public List<UUID> getAllNodes() { - lock.readLock().lock(); try { + lock.readLock().lock(); return new ArrayList<>(nodeMap.keySet()); } finally { lock.readLock().unlock(); @@ -171,22 +195,72 @@ public class NodeStateMap { } /** - * Returns the count of nodes in the specified state. - * - * @param state NodeState + * Returns the list of all the nodes as DatanodeInfo objects. * - * @return Number of nodes in the specified state + * @return list of all the node ids */ - public int getNodeCount(NodeState state) { - lock.readLock().lock(); + public List<DatanodeInfo> getAllDatanodeInfos() { try { - return stateMap.get(state).size(); + lock.readLock().lock(); + return new ArrayList<>(nodeMap.values()); } finally { lock.readLock().unlock(); } } /** + * Returns a list of the nodes as DatanodeInfo objects matching the passed + * status. + * + * @param status - The status of the nodes to return + * @return List of DatanodeInfo for the matching nodes + */ + public List<DatanodeInfo> getDatanodeInfos(NodeStatus status) { + return filterNodes(status); + } + + /** + * Returns a list of the nodes as DatanodeInfo objects matching the passed + * states. Passing null for either of the state values acts as a wildcard + * for that state. + * + * @param opState - The node operational state + * @param health - The node health + * @return List of DatanodeInfo for the matching nodes + */ + public List<DatanodeInfo> getDatanodeInfos( + NodeOperationalState opState, NodeState health) { + return filterNodes(opState, health); + } + + /** + * Returns the count of nodes in the specified state. + * + * @param state NodeStatus + * + * @return Number of nodes in the specified state + */ + public int getNodeCount(NodeStatus state) { + return getNodes(state).size(); + } + + /** + * Returns the count of node ids which match the desired operational state + * and health. Passing a null for either value is equivalent to a wild card. + * + * Therefore, passing opState=null, health=stale will count all stale nodes + * regardless of their operational state. + * + * @param opState + * @param health + * + * @return Number of nodes in the specified state + */ + public int getNodeCount(NodeOperationalState opState, NodeState health) { + return getNodes(opState, health).size(); + } + + /** * Returns the total node count. * * @return node count @@ -209,17 +283,15 @@ public class NodeStateMap { * * @throws NodeNotFoundException if the node is not found */ - public NodeState getNodeState(UUID uuid) throws NodeNotFoundException { + public NodeStatus getNodeStatus(UUID uuid) throws NodeNotFoundException { lock.readLock().lock(); try { - checkIfNodeExist(uuid); - for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) { - if (entry.getValue().contains(uuid)) { - return entry.getKey(); - } + DatanodeInfo dn = nodeMap.get(uuid); + if (dn == null) { + throw new NodeNotFoundException("Node not found in node map." + + " UUID: " + uuid); } - throw new NodeNotFoundException("Node not found in node state map." + - " UUID: " + uuid); + return dn.getNodeStatus(); } finally { lock.readLock().unlock(); } @@ -289,12 +361,13 @@ public class NodeStateMap { */ @Override public String toString() { + // TODO - fix this method to include the commented out values StringBuilder builder = new StringBuilder(); builder.append("Total number of nodes: ").append(getTotalNodeCount()); - for (NodeState state : NodeState.values()) { - builder.append("Number of nodes in ").append(state).append(" state: ") - .append(getNodeCount(state)); - } + // for (NodeState state : NodeState.values()) { + // builder.append("Number of nodes in ").append(state).append(" state: ") + // .append(getNodeCount(state)); + // } return builder.toString(); } @@ -309,4 +382,50 @@ public class NodeStateMap { throw new NodeNotFoundException("Node UUID: " + uuid); } } + + /** + * Create a list of datanodeInfo for all nodes matching the passed states. + * Passing null for one of the states acts like a wildcard for that state. + * + * @param opState + * @param health + * @return List of DatanodeInfo objects matching the passed state + */ + private List<DatanodeInfo> filterNodes( + NodeOperationalState opState, NodeState health) { + if (opState != null && health != null) { + return filterNodes(new NodeStatus(opState, health)); + } + if (opState == null && health == null) { + return getAllDatanodeInfos(); + } + try { + lock.readLock().lock(); + return nodeMap.values().stream() + .filter(n -> opState == null + || n.getNodeStatus().getOperationalState() == opState) + .filter(n -> health == null + || n.getNodeStatus().getHealth() == health) + .collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Create a list of datanodeInfo for all nodes matching the passsed status. + * + * @param status + * @return List of DatanodeInfo objects matching the passed state + */ + private List<DatanodeInfo> filterNodes(NodeStatus status) { + try { + lock.readLock().lock(); + return nodeMap.values().stream() + .filter(n -> n.getNodeStatus().equals(status)) + .collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index a5e3d37..1cebef6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -22,12 +22,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.hdds.ratis.RatisHelper; @@ -141,7 +141,7 @@ public class RatisPipelineProvider implements PipelineProvider { // Get list of healthy nodes List<DatanodeDetails> dns = - nodeManager.getNodes(NodeState.HEALTHY) + nodeManager.getNodes(NodeStatus.inServiceHealthy()) .parallelStream() .filter(dn -> !dnsUsed.contains(dn)) .limit(factor.getNumber()) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index ab98dfa..dad26b58 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import java.io.IOException; @@ -43,7 +43,7 @@ public class SimplePipelineProvider implements PipelineProvider { @Override public Pipeline create(ReplicationFactor factor) throws IOException { List<DatanodeDetails> dns = - nodeManager.getNodes(NodeState.HEALTHY); + nodeManager.getNodes(NodeStatus.inServiceHealthy()); if (dns.size() < factor.getNumber()) { String e = String .format("Cannot create pipeline of factor %d using %d nodes.", diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 7d9cb3e..9e9d2fe 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -548,7 +548,9 @@ public class SCMClientProtocolServer implements */ private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) { Set<DatanodeDetails> returnSet = new TreeSet<>(); - List<DatanodeDetails> tmp = scm.getScmNodeManager().getNodes(nodeState); + // TODO - decomm states needed + List<DatanodeDetails> tmp = scm.getScmNodeManager() + .getNodes(null, nodeState); if ((tmp != null) && (tmp.size() > 0)) { returnSet.addAll(tmp); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 4ecab37..702102b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -923,7 +923,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl * @return int -- count */ public int getNodeCount(NodeState nodestate) { - return scmNodeManager.getNodeCount(nodestate); + // TODO - decomm - this probably needs to accept opState and health + return scmNodeManager.getNodeCount(null, nodestate); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index e5c4766..bc1e771 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -264,7 +265,7 @@ public class TestBlockManager { // create pipelines for (int i = 0; - i < nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size(); i++) { + i < nodeManager.getNodes(NodeStatus.inServiceHealthy()).size(); i++) { pipelineManager.createPipeline(type, factor); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index b7a9813..e27a451 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.Node; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -147,14 +148,28 @@ public class MockNodeManager implements NodeManager { this.safemode = safemode; } + /** * Gets all Live Datanodes that is currently communicating with SCM. * - * @param nodestate - State of the node + * @param status The status of the node + * @return List of Datanodes that are Heartbeating SCM. + */ + @Override + public List<DatanodeDetails> getNodes(NodeStatus status) { + return getNodes(status.getOperationalState(), status.getHealth()); + } + + /** + * Gets all Live Datanodes that is currently communicating with SCM. + * + * @param opState - The operational State of the node + * @param nodestate - The health of the node * @return List of Datanodes that are Heartbeating SCM. */ @Override - public List<DatanodeDetails> getNodes(HddsProtos.NodeState nodestate) { + public List<DatanodeDetails> getNodes( + HddsProtos.NodeOperationalState opState, HddsProtos.NodeState nodestate) { if (nodestate == HEALTHY) { return healthyNodes; } @@ -173,12 +188,24 @@ public class MockNodeManager implements NodeManager { /** * Returns the Number of Datanodes that are communicating with SCM. * + * @param status - Status of the node + * @return int -- count + */ + @Override + public int getNodeCount(NodeStatus status) { + return getNodeCount(status.getOperationalState(), status.getHealth()); + } + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * * @param nodestate - State of the node * @return int -- count */ @Override - public int getNodeCount(HddsProtos.NodeState nodestate) { - List<DatanodeDetails> nodes = getNodes(nodestate); + public int getNodeCount( + HddsProtos.NodeOperationalState opState, HddsProtos.NodeState nodestate) { + List<DatanodeDetails> nodes = getNodes(opState, nodestate); if (nodes != null) { return nodes.size(); } @@ -419,7 +446,7 @@ public class MockNodeManager implements NodeManager { public Map<String, Integer> getNodeCount() { Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) { - nodeCountMap.put(state.toString(), getNodeCount(state)); + nodeCountMap.put(state.toString(), getNodeCount(null, state)); } return nodeCountMap; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 41585bc..2a93e3b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@ -28,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; @@ -114,7 +114,7 @@ public class TestContainerReportHandler { final ContainerReportHandler reportHandler = new ContainerReportHandler( nodeManager, containerManager); final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); + NodeStatus.inServiceHealthy()).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); @@ -183,7 +183,7 @@ public class TestContainerReportHandler { nodeManager, containerManager); final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); + NodeStatus.inServiceHealthy()).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); @@ -262,7 +262,7 @@ public class TestContainerReportHandler { nodeManager, containerManager); final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); + NodeStatus.inServiceHealthy()).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); @@ -341,7 +341,7 @@ public class TestContainerReportHandler { nodeManager, containerManager); final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); + NodeStatus.inServiceHealthy()).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); @@ -418,7 +418,7 @@ public class TestContainerReportHandler { final ContainerReportHandler reportHandler = new ContainerReportHandler( nodeManager, containerManager); final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); + NodeStatus.inServiceHealthy()).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java index 18c4a64..54c4080 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm.container.placement.algorithms; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -29,6 +28,7 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -88,7 +88,7 @@ public class TestContainerPlacementFactory { // create mock node manager nodeManager = Mockito.mock(NodeManager.class); - when(nodeManager.getNodes(NodeState.HEALTHY)) + when(nodeManager.getNodes(NodeStatus.inServiceHealthy())) .thenReturn(new ArrayList<>(datanodes)); when(nodeManager.getNodeStat(anyObject())) .thenReturn(new SCMNodeMetric(storageCapacity, 0L, 100L)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java index 00ec398..fcb4f44 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -24,12 +24,12 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.junit.Assert; import org.junit.Test; import static org.mockito.Matchers.anyObject; @@ -51,7 +51,7 @@ public class TestSCMContainerPlacementCapacity { } NodeManager mockNodeManager = Mockito.mock(NodeManager.class); - when(mockNodeManager.getNodes(NodeState.HEALTHY)) + when(mockNodeManager.getNodes(NodeStatus.inServiceHealthy())) .thenReturn(new ArrayList<>(datanodes)); when(mockNodeManager.getNodeStat(anyObject())) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java index 2d8b816..03dd829 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -20,7 +20,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -30,6 +29,7 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -102,7 +102,7 @@ public class TestSCMContainerPlacementRackAware { // create mock node manager nodeManager = Mockito.mock(NodeManager.class); - when(nodeManager.getNodes(NodeState.HEALTHY)) + when(nodeManager.getNodes(NodeStatus.inServiceHealthy())) .thenReturn(new ArrayList<>(datanodes)); when(nodeManager.getNodeStat(anyObject())) .thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 0L, 100L)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java index 43e3a8d..5edb25f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -22,12 +22,12 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.junit.Assert; import org.junit.Test; import static org.mockito.Matchers.anyObject; @@ -50,7 +50,7 @@ public class TestSCMContainerPlacementRandom { } NodeManager mockNodeManager = Mockito.mock(NodeManager.class); - when(mockNodeManager.getNodes(NodeState.HEALTHY)) + when(mockNodeManager.getNodes(NodeStatus.inServiceHealthy())) .thenReturn(new ArrayList<>(datanodes)); when(mockNodeManager.getNodeStat(anyObject())) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 26ffd8d..af9d5e5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -150,7 +150,7 @@ public class TestContainerPlacement { //TODO: wait for heartbeat to be processed Thread.sleep(4 * 1000); - assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(nodeCount, nodeManager.getNodeCount(null, HEALTHY)); assertEquals(capacity * nodeCount, (long) nodeManager.getStats().getCapacity().get()); assertEquals(used * nodeCount, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java new file mode 100644 index 0000000..bc28a43 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNull; + +/** + * Class to test the NodeStateManager, which is an internal class used by + * the SCMNodeManager. + */ + +public class TestNodeStateManager { + + private NodeStateManager nsm; + private Configuration conf; + private MockEventPublisher eventPublisher; + + @Before + public void setUp() { + conf = new Configuration(); + eventPublisher = new MockEventPublisher(); + nsm = new NodeStateManager(conf, eventPublisher); + } + + @After + public void tearDown() { + } + + @Test + public void testNodeCanBeAddedAndRetrieved() + throws NodeAlreadyExistsException, NodeNotFoundException { + // Create a datanode, then add and retrieve it + DatanodeDetails dn = generateDatanode(); + nsm.addNode(dn); + assertEquals(dn.getUuid(), nsm.getNode(dn).getUuid()); + // Now get the status of the newly added node and it should be + // IN_SERVICE and HEALTHY + NodeStatus expectedState = NodeStatus.inServiceHealthy(); + assertEquals(expectedState, nsm.getNodeStatus(dn)); + } + + @Test + public void testGetAllNodesReturnsCorrectly() + throws NodeAlreadyExistsException { + DatanodeDetails dn = generateDatanode(); + nsm.addNode(dn); + dn = generateDatanode(); + nsm.addNode(dn); + assertEquals(2, nsm.getAllNodes().size()); + assertEquals(2, nsm.getTotalNodeCount()); + } + + @Test + public void testGetNodeCountReturnsCorrectly() + throws NodeAlreadyExistsException { + DatanodeDetails dn = generateDatanode(); + nsm.addNode(dn); + assertEquals(1, nsm.getNodes(NodeStatus.inServiceHealthy()).size()); + assertEquals(0, nsm.getNodes(NodeStatus.inServiceStale()).size()); + } + + @Test + public void testGetNodeCount() throws NodeAlreadyExistsException { + DatanodeDetails dn = generateDatanode(); + nsm.addNode(dn); + assertEquals(1, nsm.getNodeCount(NodeStatus.inServiceHealthy())); + assertEquals(0, nsm.getNodeCount(NodeStatus.inServiceStale())); + } + + @Test + public void testNodesMarkedDeadAndStale() + throws NodeAlreadyExistsException, NodeNotFoundException { + long now = Time.monotonicNow(); + + // Set the dead and stale limits to be 1 second larger than configured + long staleLimit = HddsServerUtil.getStaleNodeInterval(conf) + 1000; + long deadLimit = HddsServerUtil.getDeadNodeInterval(conf) + 1000; + + DatanodeDetails staleDn = generateDatanode(); + nsm.addNode(staleDn); + nsm.getNode(staleDn).updateLastHeartbeatTime(now - staleLimit); + + DatanodeDetails deadDn = generateDatanode(); + nsm.addNode(deadDn); + nsm.getNode(deadDn).updateLastHeartbeatTime(now - deadLimit); + + DatanodeDetails healthyDn = generateDatanode(); + nsm.addNode(healthyDn); + nsm.getNode(healthyDn).updateLastHeartbeatTime(); + + nsm.checkNodesHealth(); + assertEquals(healthyDn, nsm.getHealthyNodes().get(0)); + // A node cannot go directly to dead. It must be marked stale first + // due to the allowed state transitions. Therefore we will initially have 2 + // stale nodesCheck it is in stale nodes + assertEquals(2, nsm.getStaleNodes().size()); + // Now check health again and it should be in deadNodes() + nsm.checkNodesHealth(); + assertEquals(staleDn, nsm.getStaleNodes().get(0)); + assertEquals(deadDn, nsm.getDeadNodes().get(0)); + } + + @Test + public void testNodeCanTransitionThroughHealthStatesAndFiresEvents() + throws NodeAlreadyExistsException, NodeNotFoundException { + long now = Time.monotonicNow(); + + // Set the dead and stale limits to be 1 second larger than configured + long staleLimit = HddsServerUtil.getStaleNodeInterval(conf) + 1000; + long deadLimit = HddsServerUtil.getDeadNodeInterval(conf) + 1000; + + DatanodeDetails dn = generateDatanode(); + nsm.addNode(dn); + assertEquals("New_Node", eventPublisher.getLastEvent().getName()); + DatanodeInfo dni = nsm.getNode(dn); + dni.updateLastHeartbeatTime(); + + // Ensure node is initially healthy + eventPublisher.clearEvents(); + nsm.checkNodesHealth(); + assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth()); + assertNull(eventPublisher.getLastEvent()); + + // Set the heartbeat old enough to make it stale + dni.updateLastHeartbeatTime(now - staleLimit); + nsm.checkNodesHealth(); + assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth()); + assertEquals("Stale_Node", eventPublisher.getLastEvent().getName()); + + // Now make it dead + dni.updateLastHeartbeatTime(now - deadLimit); + nsm.checkNodesHealth(); + assertEquals(NodeState.DEAD, nsm.getNodeStatus(dn).getHealth()); + assertEquals("Dead_Node", eventPublisher.getLastEvent().getName()); + + // Transition back to healthy from dead + dni.updateLastHeartbeatTime(); + nsm.checkNodesHealth(); + assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth()); + assertEquals("NON_HEALTHY_TO_HEALTHY_NODE", + eventPublisher.getLastEvent().getName()); + + // Make the node stale again, and transition to healthy. + dni.updateLastHeartbeatTime(now - staleLimit); + nsm.checkNodesHealth(); + assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth()); + assertEquals("Stale_Node", eventPublisher.getLastEvent().getName()); + dni.updateLastHeartbeatTime(); + nsm.checkNodesHealth(); + assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth()); + assertEquals("NON_HEALTHY_TO_HEALTHY_NODE", + eventPublisher.getLastEvent().getName()); + } + + private DatanodeDetails generateDatanode() { + String uuid = UUID.randomUUID().toString(); + return DatanodeDetails.newBuilder().setUuid(uuid).build(); + } + + static class MockEventPublisher implements EventPublisher { + + private List<Event> events = new ArrayList<>(); + private List<Object> payloads = new ArrayList<>(); + + public void clearEvents() { + events.clear(); + payloads.clear(); + } + + public List<Event> getEvents() { + return events; + } + + public Event getLastEvent() { + if (events.size() == 0) { + return null; + } else { + return events.get(events.size()-1); + } + } + + @Override + public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void + fireEvent(EVENT_TYPE event, PAYLOAD payload) { + events.add(event); + payloads.add(payload); + } + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index d028851..a37142f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -72,9 +72,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; import static org.junit.Assert.assertEquals; @@ -238,7 +235,8 @@ public class TestSCMNodeManager { } //TODO: wait for heartbeat to be processed Thread.sleep(4 * 1000); - assertEquals(count, nodeManager.getNodeCount(HEALTHY)); + assertEquals(count, nodeManager.getNodeCount( + NodeStatus.inServiceHealthy())); } } @@ -312,9 +310,10 @@ public class TestSCMNodeManager { // Wait for 2 seconds, wait a total of 4 seconds to make sure that the // node moves into stale state. Thread.sleep(2 * 1000); - List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE); + List<DatanodeDetails> staleNodeList = + nodeManager.getNodes(NodeStatus.inServiceStale()); assertEquals("Expected to find 1 stale node", - 1, nodeManager.getNodeCount(STALE)); + 1, nodeManager.getNodeCount(NodeStatus.inServiceStale())); assertEquals("Expected to find 1 stale node", 1, staleNodeList.size()); assertEquals("Stale node is not the expected ID", staleNode @@ -331,16 +330,17 @@ public class TestSCMNodeManager { Thread.sleep(2 * 1000); // the stale node has been removed - staleNodeList = nodeManager.getNodes(STALE); + staleNodeList = nodeManager.getNodes(NodeStatus.inServiceStale()); assertEquals("Expected to find 1 stale node", - 0, nodeManager.getNodeCount(STALE)); + 0, nodeManager.getNodeCount(NodeStatus.inServiceStale())); assertEquals("Expected to find 1 stale node", 0, staleNodeList.size()); // Check for the dead node now. - List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD); + List<DatanodeDetails> deadNodeList = + nodeManager.getNodes(NodeStatus.inServiceDead()); assertEquals("Expected to find 1 dead node", 1, - nodeManager.getNodeCount(DEAD)); + nodeManager.getNodeCount(NodeStatus.inServiceDead())); assertEquals("Expected to find 1 dead node", 1, deadNodeList.size()); assertEquals("Dead node is not the expected ID", staleNode @@ -388,8 +388,8 @@ public class TestSCMNodeManager { //Assert all nodes are healthy. assertEquals(2, nodeManager.getAllNodes().size()); - assertEquals(2, nodeManager.getNodeCount(HEALTHY)); - + assertEquals(2, + nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); /** * Simulate a JVM Pause and subsequent handling in following steps: * Step 1 : stop heartbeat check process for stale node interval @@ -424,7 +424,7 @@ public class TestSCMNodeManager { // Step 4 : all nodes should still be HEALTHY assertEquals(2, nodeManager.getAllNodes().size()); - assertEquals(2, nodeManager.getNodeCount(HEALTHY)); + assertEquals(2, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); // Step 5 : heartbeat for node1 nodeManager.processHeartbeat(node1); @@ -433,8 +433,8 @@ public class TestSCMNodeManager { Thread.sleep(1000); // Step 7 : node2 should transition to STALE - assertEquals(1, nodeManager.getNodeCount(HEALTHY)); - assertEquals(1, nodeManager.getNodeCount(STALE)); + assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); + assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale())); } } @@ -533,7 +533,7 @@ public class TestSCMNodeManager { //Assert all nodes are healthy. assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); /** * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which @@ -541,7 +541,7 @@ public class TestSCMNodeManager { */ Thread.sleep(3 * 1000); assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(3, nodeManager.getNodeCount(STALE)); + assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceStale())); /** @@ -559,18 +559,19 @@ public class TestSCMNodeManager { Thread.sleep(1500); nodeManager.processHeartbeat(healthyNode); Thread.sleep(2 * 1000); - assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); // 3.5 seconds from last heartbeat for the stale and deadNode. So those // 2 nodes must move to Stale state and the healthy node must // remain in the healthy State. - List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY); + List<DatanodeDetails> healthyList = nodeManager.getNodes( + NodeStatus.inServiceHealthy()); assertEquals("Expected one healthy node", 1, healthyList.size()); assertEquals("Healthy node is not the expected ID", healthyNode .getUuid(), healthyList.get(0).getUuid()); - assertEquals(2, nodeManager.getNodeCount(STALE)); + assertEquals(2, nodeManager.getNodeCount(NodeStatus.inServiceStale())); /** * Cluster State: Allow healthyNode to remain in healthy state and @@ -586,14 +587,16 @@ public class TestSCMNodeManager { // 3.5 seconds have elapsed for stale node, so it moves into Stale. // 7 seconds have elapsed for dead node, so it moves into dead. // 2 Seconds have elapsed for healthy node, so it stays in healthy state. - healthyList = nodeManager.getNodes(HEALTHY); - List<DatanodeDetails> staleList = nodeManager.getNodes(STALE); - List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); + healthyList = nodeManager.getNodes((NodeStatus.inServiceHealthy())); + List<DatanodeDetails> staleList = + nodeManager.getNodes(NodeStatus.inServiceStale()); + List<DatanodeDetails> deadList = + nodeManager.getNodes(NodeStatus.inServiceDead()); assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(1, nodeManager.getNodeCount(HEALTHY)); - assertEquals(1, nodeManager.getNodeCount(STALE)); - assertEquals(1, nodeManager.getNodeCount(DEAD)); + assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); + assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale())); + assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceDead())); assertEquals("Expected one healthy node", 1, healthyList.size()); @@ -619,7 +622,7 @@ public class TestSCMNodeManager { Thread.sleep(500); //Assert all nodes are healthy. assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); } } @@ -668,7 +671,7 @@ public class TestSCMNodeManager { */ private boolean findNodes(NodeManager nodeManager, int count, HddsProtos.NodeState state) { - return count == nodeManager.getNodeCount(state); + return count == nodeManager.getNodeCount(NodeStatus.inServiceStale()); } /** @@ -741,11 +744,14 @@ public class TestSCMNodeManager { // Assert all healthy nodes are healthy now, this has to be a greater // than check since Stale nodes can be healthy when we check the state. - assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount); + assertTrue(nodeManager.getNodeCount(NodeStatus.inServiceHealthy()) + >= healthyCount); - assertEquals(deadCount, nodeManager.getNodeCount(DEAD)); + assertEquals(deadCount, + nodeManager.getNodeCount(NodeStatus.inServiceDead())); - List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); + List<DatanodeDetails> deadList = + nodeManager.getNodes(NodeStatus.inServiceDead()); for (DatanodeDetails node : deadList) { assertTrue(deadNodeList.contains(node)); @@ -861,7 +867,8 @@ public class TestSCMNodeManager { } //TODO: wait for heartbeat to be processed Thread.sleep(4 * 1000); - assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(nodeCount, + nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); assertEquals(capacity * nodeCount, (long) nodeManager.getStats() .getCapacity().get()); assertEquals(used * nodeCount, (long) nodeManager.getStats() @@ -953,7 +960,7 @@ public class TestSCMNodeManager { // Wait up to 4s so that the node becomes stale // Verify the usage info should be unchanged. GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(STALE) == 1, 100, + () -> nodeManager.getNodeCount(NodeStatus.inServiceStale()) == 1, 100, 4 * 1000); assertEquals(nodeCount, nodeManager.getNodeStats().size()); @@ -971,7 +978,7 @@ public class TestSCMNodeManager { // Wait up to 4 more seconds so the node becomes dead // Verify usage info should be updated. GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(DEAD) == 1, 100, + () -> nodeManager.getNodeCount(NodeStatus.inServiceDead()) == 1, 100, 4 * 1000); assertEquals(0, nodeManager.getNodeStats().size()); @@ -989,7 +996,7 @@ public class TestSCMNodeManager { // Wait up to 5 seconds so that the dead node becomes healthy // Verify usage info should be updated. GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(HEALTHY) == 1, + () -> nodeManager.getNodeCount(NodeStatus.inServiceHealthy()) == 1, 100, 5 * 1000); GenericTestUtils.waitFor( () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, @@ -1100,7 +1107,8 @@ public class TestSCMNodeManager { // verify network topology cluster has all the registered nodes Thread.sleep(4 * 1000); NetworkTopology clusterMap = scm.getClusterMap(); - assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(nodeCount, + nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); assertEquals(nodeCount, clusterMap.getNumOfLeafNode("")); assertEquals(4, clusterMap.getMaxLevel()); List<DatanodeDetails> nodeList = nodeManager.getAllNodes(); @@ -1142,7 +1150,8 @@ public class TestSCMNodeManager { // verify network topology cluster has all the registered nodes Thread.sleep(4 * 1000); NetworkTopology clusterMap = scm.getClusterMap(); - assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(nodeCount, + nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); assertEquals(nodeCount, clusterMap.getNumOfLeafNode("")); assertEquals(3, clusterMap.getMaxLevel()); List<DatanodeDetails> nodeList = nodeManager.getAllNodes(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java new file mode 100644 index 0000000..482f444 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node.states; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.UUID; + +import static junit.framework.TestCase.assertEquals; + +/** + * Class to test the NodeStateMap class, which is an internal class used by + * NodeStateManager. + */ + +public class TestNodeStateMap { + + private NodeStateMap map; + + @Before + public void setUp() { + map = new NodeStateMap(); + } + + @After + public void tearDown() { + } + + @Test + public void testNodeCanBeAddedAndRetrieved() + throws NodeAlreadyExistsException, NodeNotFoundException { + DatanodeDetails dn = generateDatanode(); + NodeStatus status = NodeStatus.inServiceHealthy(); + map.addNode(dn, status); + assertEquals(dn, map.getNodeInfo(dn.getUuid())); + assertEquals(status, map.getNodeStatus(dn.getUuid())); + } + + @Test + public void testNodeHealthStateCanBeUpdated() + throws NodeAlreadyExistsException, NodeNotFoundException { + DatanodeDetails dn = generateDatanode(); + NodeStatus status = NodeStatus.inServiceHealthy(); + map.addNode(dn, status); + + NodeStatus expectedStatus = NodeStatus.inServiceStale(); + NodeStatus returnedStatus = + map.updateNodeHealthState(dn.getUuid(), expectedStatus.getHealth()); + assertEquals(expectedStatus, returnedStatus); + assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid())); + } + + @Test + public void testNodeOperationalStateCanBeUpdated() + throws NodeAlreadyExistsException, NodeNotFoundException { + DatanodeDetails dn = generateDatanode(); + NodeStatus status = NodeStatus.inServiceHealthy(); + map.addNode(dn, status); + + NodeStatus expectedStatus = new NodeStatus( + NodeOperationalState.DECOMMISSIONING, + NodeState.HEALTHY); + NodeStatus returnedStatus = map.updateNodeOperationalState( + dn.getUuid(), expectedStatus.getOperationalState()); + assertEquals(expectedStatus, returnedStatus); + assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid())); + } + + @Test + public void testGetNodeMethodsReturnCorrectCountsAndStates() + throws NodeAlreadyExistsException { + // Add one node for all possible states + int nodeCount = 0; + for(NodeOperationalState op : NodeOperationalState.values()) { + for(NodeState health : NodeState.values()) { + addRandomNodeWithState(op, health); + nodeCount++; + } + } + NodeStatus requestedState = NodeStatus.inServiceStale(); + List<UUID> nodes = map.getNodes(requestedState); + assertEquals(1, nodes.size()); + assertEquals(1, map.getNodeCount(requestedState)); + assertEquals(nodeCount, map.getTotalNodeCount()); + assertEquals(nodeCount, map.getAllNodes().size()); + assertEquals(nodeCount, map.getAllDatanodeInfos().size()); + + // Checks for the getNodeCount(opstate, health) method + assertEquals(nodeCount, map.getNodeCount(null, null)); + assertEquals(1, + map.getNodeCount(NodeOperationalState.DECOMMISSIONING, + NodeState.STALE)); + assertEquals(5, map.getNodeCount(null, NodeState.HEALTHY)); + assertEquals(3, + map.getNodeCount(NodeOperationalState.DECOMMISSIONING, null)); + } + + private void addNodeWithState(DatanodeDetails dn, + NodeOperationalState opState, NodeState health) + throws NodeAlreadyExistsException { + NodeStatus status = new NodeStatus(opState, health); + map.addNode(dn, status); + } + + private void addRandomNodeWithState( + NodeOperationalState opState, NodeState health) + throws NodeAlreadyExistsException { + DatanodeDetails dn = generateDatanode(); + addNodeWithState(dn, opState, health); + } + + private DatanodeDetails generateDatanode() { + String uuid = UUID.randomUUID().toString(); + return DatanodeDetails.newBuilder().setUuid(uuid).build(); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java index f0b1cbb..2ac5f70 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.ozone.OzoneConsts; import org.junit.Assert; import org.junit.Test; @@ -34,8 +35,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; import static org.junit.Assert.assertEquals; /** @@ -45,7 +44,8 @@ public class TestContainerPlacement { private DescriptiveStatistics computeStatistics(NodeManager nodeManager) { DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics(); - for (DatanodeDetails dd : nodeManager.getNodes(HEALTHY)) { + for (DatanodeDetails dd : + nodeManager.getNodes(NodeStatus.inServiceHealthy())) { float weightedValue = nodeManager.getNodeStat(dd).get().getScmUsed().get() / (float) nodeManager.getNodeStat(dd).get().getCapacity().get(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 30a75ef..b584f3f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -17,9 +17,11 @@ package org.apache.hadoop.ozone.container.testutils; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -81,22 +83,48 @@ public class ReplicationNodeManagerMock implements NodeManager { /** * Gets all Live Datanodes that is currently communicating with SCM. * - * @param nodestate - State of the node + * @param nodestatus - State of the node * @return List of Datanodes that are Heartbeating SCM. */ @Override - public List<DatanodeDetails> getNodes(NodeState nodestate) { + public List<DatanodeDetails> getNodes(NodeStatus nodestatus) { return null; } /** + * Gets all Live Datanodes that is currently communicating with SCM. + * + * @param opState - Operational state of the node + * @param health - Health of the node + * @return List of Datanodes that are Heartbeating SCM. + */ + @Override + public List<DatanodeDetails> getNodes( + HddsProtos.NodeOperationalState opState, NodeState health) { + return null; + } + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * + * @param nodestatus - State of the node + * @return int -- count + */ + @Override + public int getNodeCount(NodeStatus nodestatus) { + return 0; + } + + /** * Returns the Number of Datanodes that are communicating with SCM. * - * @param nodestate - State of the node + * @param opState - Operational state of the node + * @param health - Health of the node * @return int -- count */ @Override - public int getNodeCount(NodeState nodestate) { + public int getNodeCount( + HddsProtos.NodeOperationalState opState, NodeState health) { return 0; } diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java index 7de2e4b..42773f8 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/TopologySubcommand.java @@ -25,8 +25,6 @@ import org.apache.hadoop.hdds.scm.client.ScmClient; import picocli.CommandLine; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; @@ -57,8 +55,6 @@ public class TopologySubcommand implements Callable<Void> { stateArray.add(HEALTHY); stateArray.add(STALE); stateArray.add(DEAD); - stateArray.add(DECOMMISSIONING); - stateArray.add(DECOMMISSIONED); } @CommandLine.Option(names = {"-o", "--order"}, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index ba072f8..225281a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; @@ -66,6 +65,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -366,8 +366,7 @@ public class TestStorageContainerManager { NodeManager nodeManager = cluster.getStorageContainerManager() .getScmNodeManager(); List<SCMCommand> commands = nodeManager.processHeartbeat( - nodeManager.getNodes(NodeState.HEALTHY).get(0)); - + nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0)); if (commands != null) { for (SCMCommand cmd : commands) { if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java index 65a6357..98a22a2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java @@ -149,10 +149,6 @@ public class TestSCMNodeMetrics { getMetrics(SCMNodeMetrics.class.getSimpleName())); assertGauge("DeadNodes", 0, getMetrics(SCMNodeMetrics.class.getSimpleName())); - assertGauge("DecommissioningNodes", 0, - getMetrics(SCMNodeMetrics.class.getSimpleName())); - assertGauge("DecommissionedNodes", 0, - getMetrics(SCMNodeMetrics.class.getSimpleName())); assertGauge("DiskCapacity", 100L, getMetrics(SCMNodeMetrics.class.getSimpleName())); assertGauge("DiskUsed", 10L, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org