HDDS-78. Add per volume level storage stats in SCM.
Contributed by  Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0cf6e87f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0cf6e87f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0cf6e87f

Branch: refs/heads/HDDS-48
Commit: 0cf6e87f9212af10eae39cdcb1fe60e6d8191772
Parents: f24c842
Author: Anu Engineer <aengin...@apache.org>
Authored: Sat May 26 11:06:22 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Sat May 26 11:11:14 2018 -0700

----------------------------------------------------------------------
 .../placement/metrics/SCMNodeStat.java          |  21 --
 .../hdds/scm/node/SCMNodeStorageStatMXBean.java |   8 +
 .../hdds/scm/node/SCMNodeStorageStatMap.java    | 230 +++++++++++++------
 .../hdds/scm/node/StorageReportResult.java      |  87 +++++++
 .../scm/node/TestSCMNodeStorageStatMap.java     | 141 +++++++++---
 5 files changed, 356 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cf6e87f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
index 4fe72fc..3c871d3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
@@ -136,25 +136,4 @@ public class SCMNodeStat implements NodeStat {
   public int hashCode() {
     return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get());
   }
-
-
-  /**
-   * Truncate to 4 digits since uncontrolled precision is some times
-   * counter intuitive to what users expect.
-   * @param value - double.
-   * @return double.
-   */
-  private double truncateDecimals(double value) {
-    final int multiplier = 10000;
-    return (double) ((long) (value * multiplier)) / multiplier;
-  }
-
-  /**
-   * get the scmUsed ratio
-   */
-  public  double getScmUsedratio() {
-    double scmUsedRatio =
-        truncateDecimals(getScmUsed().get() / (double) getCapacity().get());
-    return scmUsedRatio;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cf6e87f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
index f17a970..d81ff0f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -66,4 +68,10 @@ public interface SCMNodeStorageStatMXBean {
    * @return long
    */
   long getTotalFreeSpace();
+
+  /**
+   * Returns the set of disks for a given Datanode.
+   * @return set of storage volumes
+   */
+  Set<StorageLocationReport> getStorageVolumes(UUID datanodeId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cf6e87f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
index 25cb357..f8ad2af 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
@@ -22,18 +22,18 @@ package org.apache.hadoop.hdds.scm.node;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.io.IOException;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -52,16 +52,15 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
   private final double warningUtilizationThreshold;
   private final double criticalUtilizationThreshold;
 
-  private final Map<UUID, SCMNodeStat> scmNodeStorageStatMap;
+  private final Map<UUID, Set<StorageLocationReport>> scmNodeStorageReportMap;
   // NodeStorageInfo MXBean
   private ObjectName scmNodeStorageInfoBean;
-  // Aggregated node stats
-  private SCMNodeStat clusterStat;
   /**
-   * constructs the scmNodeStorageStatMap object
+   * constructs the scmNodeStorageReportMap object
    */
   public SCMNodeStorageStatMap(OzoneConfiguration conf) {
-    scmNodeStorageStatMap = new ConcurrentHashMap<>();
+    // scmNodeStorageReportMap = new ConcurrentHashMap<>();
+    scmNodeStorageReportMap = new ConcurrentHashMap<>();
     warningUtilizationThreshold = conf.getDouble(
         OzoneConfigKeys.
             HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD,
@@ -72,7 +71,6 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
             HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD,
         OzoneConfigKeys.
             HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT);
-    clusterStat = new SCMNodeStat();
   }
 
   public enum UtilizationThreshold {
@@ -81,20 +79,22 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
 
   /**
    * Returns true if this a datanode that is already tracked by
-   * scmNodeStorageStatMap.
+   * scmNodeStorageReportMap.
    *
    * @param datanodeID - UUID of the Datanode.
    * @return True if this is tracked, false if this map does not know about it.
    */
   public boolean isKnownDatanode(UUID datanodeID) {
     Preconditions.checkNotNull(datanodeID);
-    return scmNodeStorageStatMap.containsKey(datanodeID);
+    return scmNodeStorageReportMap.containsKey(datanodeID);
   }
 
   public List<UUID> getDatanodeList(
       UtilizationThreshold threshold) {
-    return scmNodeStorageStatMap.entrySet().stream()
-        .filter(entry -> (isThresholdReached(threshold, entry.getValue())))
+    return scmNodeStorageReportMap.entrySet().stream().filter(
+        entry -> (isThresholdReached(threshold,
+            getScmUsedratio(getUsedSpace(entry.getKey()),
+                getCapacity(entry.getKey())))))
         .map(Map.Entry::getKey)
         .collect(Collectors.toList());
   }
@@ -105,19 +105,19 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
    * Insert a new datanode into Node2Container Map.
    *
    * @param datanodeID -- Datanode UUID
-   * @param stat - scmNode stat for the Datanode.
+   * @param report - set if StorageReports.
    */
-  public void insertNewDatanode(UUID datanodeID, SCMNodeStat stat)
+  public void insertNewDatanode(UUID datanodeID, Set<StorageLocationReport> 
report)
       throws SCMException {
-    Preconditions.checkNotNull(stat);
+    Preconditions.checkNotNull(report);
+    Preconditions.checkState(report.size() != 0);
     Preconditions.checkNotNull(datanodeID);
-    synchronized (scmNodeStorageStatMap) {
+    synchronized (scmNodeStorageReportMap) {
       if (isKnownDatanode(datanodeID)) {
         throw new SCMException("Node already exists in the map",
             DUPLICATE_DATANODE);
       }
-      scmNodeStorageStatMap.put(datanodeID, stat);
-      clusterStat.add(stat);
+      scmNodeStorageReportMap.putIfAbsent(datanodeID, report);
     }
   }
 
@@ -138,72 +138,103 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
    * Updates the Container list of an existing DN.
    *
    * @param datanodeID - UUID of DN.
-   * @param stat - scmNode stat for the Datanode.
+   * @param report - set of Storage Reports for the Datanode.
    * @throws SCMException - if we don't know about this datanode, for new DN
    *                      use insertNewDatanode.
    */
-  public void updateDatanodeMap(UUID datanodeID, SCMNodeStat stat)
+  public void updateDatanodeMap(UUID datanodeID, Set<StorageLocationReport> 
report)
       throws SCMException {
     Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(stat);
-    synchronized (scmNodeStorageStatMap) {
-      if (!scmNodeStorageStatMap.containsKey(datanodeID)) {
+    Preconditions.checkNotNull(report);
+    Preconditions.checkState(report.size() != 0);
+    synchronized (scmNodeStorageReportMap) {
+      if (!scmNodeStorageReportMap.containsKey(datanodeID)) {
         throw new SCMException("No such datanode", NO_SUCH_DATANODE);
       }
-      SCMNodeStat removed = scmNodeStorageStatMap.get(datanodeID);
-      clusterStat.subtract(removed);
-      scmNodeStorageStatMap.put(datanodeID, stat);
-      clusterStat.add(stat);
+      scmNodeStorageReportMap.put(datanodeID, report);
     }
   }
 
-  public NodeReportStatus processNodeReport(UUID datanodeID,
+  public StorageReportResult processNodeReport(UUID datanodeID,
       StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
-      throws SCMException {
+      throws IOException {
     Preconditions.checkNotNull(datanodeID);
     Preconditions.checkNotNull(nodeReport);
+
     long totalCapacity = 0;
     long totalRemaining = 0;
     long totalScmUsed = 0;
-    List<StorageContainerDatanodeProtocolProtos.SCMStorageReport>
+    Set<StorageLocationReport> storagReportSet = new HashSet<>();
+    Set<StorageLocationReport> fullVolumeSet = new HashSet<>();
+    Set<StorageLocationReport> failedVolumeSet = new HashSet<>();
+    List<SCMStorageReport>
         storageReports = nodeReport.getStorageReportList();
-    for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report : 
storageReports) {
+    for (SCMStorageReport report : storageReports) {
+      StorageLocationReport storageReport =
+          StorageLocationReport.getFromProtobuf(report);
+      storagReportSet.add(storageReport);
+      if (report.hasFailed() && report.getFailed()) {
+        failedVolumeSet.add(storageReport);
+      } else if (isThresholdReached(UtilizationThreshold.CRITICAL,
+          getScmUsedratio(report.getScmUsed(), report.getCapacity()))) {
+        fullVolumeSet.add(storageReport);
+      }
       totalCapacity += report.getCapacity();
       totalRemaining += report.getRemaining();
       totalScmUsed += report.getScmUsed();
     }
-    SCMNodeStat stat = scmNodeStorageStatMap.get(datanodeID);
-    if (stat == null) {
-      stat = new SCMNodeStat();
-      stat.set(totalCapacity, totalScmUsed, totalRemaining);
-      insertNewDatanode(datanodeID, stat);
+
+    if (!isKnownDatanode(datanodeID)) {
+      insertNewDatanode(datanodeID, storagReportSet);
     } else {
-      stat.set(totalCapacity, totalScmUsed, totalRemaining);
-      updateDatanodeMap(datanodeID, stat);
+      updateDatanodeMap(datanodeID, storagReportSet);
     }
-    if (isThresholdReached(UtilizationThreshold.CRITICAL, stat)) {
+    if (isThresholdReached(UtilizationThreshold.CRITICAL,
+        getScmUsedratio(totalScmUsed, totalCapacity))) {
       LOG.warn("Datanode {} is out of storage space. Capacity: {}, Used: {}",
-          datanodeID, stat.getCapacity().get(), stat.getScmUsed().get());
-      return NodeReportStatus.DATANODE_OUT_OF_SPACE;
-    } else {
-      if (isThresholdReached(UtilizationThreshold.WARN, stat)) {
-       LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}",
-           datanodeID, stat.getCapacity().get(), stat.getScmUsed().get());
-      }
-      return NodeReportStatus.ALL_IS_WELL;
+          datanodeID, totalCapacity, totalScmUsed);
+      return StorageReportResult.ReportResultBuilder.newBuilder()
+          .setStatus(ReportStatus.DATANODE_OUT_OF_SPACE)
+          .setFullVolumeSet(fullVolumeSet).setFailedVolumeSet(failedVolumeSet)
+          .build();
+    }
+    if (isThresholdReached(UtilizationThreshold.WARN,
+        getScmUsedratio(totalScmUsed, totalCapacity))) {
+      LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}",
+          datanodeID, totalCapacity, totalScmUsed);
     }
+
+    if (failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) {
+      return StorageReportResult.ReportResultBuilder.newBuilder()
+          .setStatus(ReportStatus.STORAGE_OUT_OF_SPACE)
+          .setFullVolumeSet(fullVolumeSet).build();
+    }
+
+    if (!failedVolumeSet.isEmpty() && fullVolumeSet.isEmpty()) {
+      return StorageReportResult.ReportResultBuilder.newBuilder()
+          .setStatus(ReportStatus.FAILED_STORAGE)
+          .setFailedVolumeSet(failedVolumeSet).build();
+    }
+    if (!failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) {
+      return StorageReportResult.ReportResultBuilder.newBuilder()
+          .setStatus(ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE)
+          .setFailedVolumeSet(failedVolumeSet).setFullVolumeSet(fullVolumeSet)
+          .build();
+    }
+    return StorageReportResult.ReportResultBuilder.newBuilder()
+        .setStatus(ReportStatus.ALL_IS_WELL).build();
   }
 
   private boolean isThresholdReached(UtilizationThreshold threshold,
-      SCMNodeStat stat) {
+      double scmUsedratio) {
     switch (threshold) {
     case NORMAL:
-      return stat.getScmUsedratio() < warningUtilizationThreshold;
+      return scmUsedratio < warningUtilizationThreshold;
     case WARN:
-      return stat.getScmUsedratio() >= warningUtilizationThreshold &&
-          stat.getScmUsedratio() < criticalUtilizationThreshold;
+      return scmUsedratio >= warningUtilizationThreshold
+          && scmUsedratio < criticalUtilizationThreshold;
     case CRITICAL:
-      return stat.getScmUsedratio() >= criticalUtilizationThreshold;
+      return scmUsedratio >= criticalUtilizationThreshold;
     default:
       throw new RuntimeException("Unknown UtilizationThreshold value");
     }
@@ -211,67 +242,120 @@ public class SCMNodeStorageStatMap implements 
SCMNodeStorageStatMXBean {
 
   @Override
   public long getCapacity(UUID dnId) {
-    return scmNodeStorageStatMap.get(dnId).getCapacity().get();
+    long capacity = 0;
+    Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
+    for (StorageLocationReport report : reportSet) {
+      capacity += report.getCapacity();
+    }
+    return capacity;
   }
 
   @Override
   public long getRemainingSpace(UUID dnId) {
-    return scmNodeStorageStatMap.get(dnId).getRemaining().get();
+    long remaining = 0;
+    Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
+    for (StorageLocationReport report : reportSet) {
+      remaining += report.getRemaining();
+    }
+    return remaining;
   }
 
   @Override
   public long getUsedSpace(UUID dnId) {
-    return scmNodeStorageStatMap.get(dnId).getScmUsed().get();
+    long scmUsed = 0;
+    Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
+    for (StorageLocationReport report : reportSet) {
+      scmUsed += report.getScmUsed();
+    }
+    return scmUsed;
   }
 
   @Override
   public long getTotalCapacity() {
-    return clusterStat.getCapacity().get();
+    long capacity = 0;
+    Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
+    for (UUID id : dnIdSet) {
+      capacity += getCapacity(id);
+    }
+    return capacity;
   }
 
   @Override
   public long getTotalSpaceUsed() {
-    return clusterStat.getScmUsed().get();
+    long scmUsed = 0;
+    Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
+    for (UUID id : dnIdSet) {
+      scmUsed += getUsedSpace(id);
+    }
+    return scmUsed;
   }
 
   @Override
   public long getTotalFreeSpace() {
-    return clusterStat.getRemaining().get();
+    long remaining = 0;
+    Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
+    for (UUID id : dnIdSet) {
+      remaining += getRemainingSpace(id);
+    }
+    return remaining;
   }
 
   /**
-   * removes the dataNode from scmNodeStorageStatMap
+   * removes the dataNode from scmNodeStorageReportMap
    * @param datanodeID
    * @throws SCMException in case the dataNode is not found in the map.
    */
   public void removeDatanode(UUID datanodeID) throws SCMException {
     Preconditions.checkNotNull(datanodeID);
-    synchronized (scmNodeStorageStatMap) {
-      if (!scmNodeStorageStatMap.containsKey(datanodeID)) {
+    synchronized (scmNodeStorageReportMap) {
+      if (!scmNodeStorageReportMap.containsKey(datanodeID)) {
         throw new SCMException("No such datanode", NO_SUCH_DATANODE);
       }
-      SCMNodeStat stat = scmNodeStorageStatMap.remove(datanodeID);
-      clusterStat.subtract(stat);
+      scmNodeStorageReportMap.remove(datanodeID);
     }
   }
 
   /**
-   * Gets the SCMNodeStat for the datanode
+   * Returns the set of storage volumes for a Datanode.
    * @param  datanodeID
-   * @return SCMNodeStat
+   * @return set of storage volumes.
    */
 
-  SCMNodeStat getNodeStat(UUID datanodeID) {
-    return scmNodeStorageStatMap.get(datanodeID);
+  @Override
+  public Set<StorageLocationReport> getStorageVolumes(UUID datanodeID) {
+    return scmNodeStorageReportMap.get(datanodeID);
   }
 
+
+  /**
+   * Truncate to 4 digits since uncontrolled precision is some times
+   * counter intuitive to what users expect.
+   * @param value - double.
+   * @return double.
+   */
+  private double truncateDecimals(double value) {
+    final int multiplier = 10000;
+    return (double) ((long) (value * multiplier)) / multiplier;
+  }
+
+  /**
+   * get the scmUsed ratio
+   */
+  public  double getScmUsedratio(long scmUsed, long capacity) {
+    double scmUsedRatio =
+        truncateDecimals (scmUsed / (double) capacity);
+    return scmUsedRatio;
+  }
   /**
    * Results possible from processing a Node report by
    * Node2ContainerMapper.
    */
-  public enum NodeReportStatus {
+  public enum ReportStatus {
     ALL_IS_WELL,
-    DATANODE_OUT_OF_SPACE
+    DATANODE_OUT_OF_SPACE,
+    STORAGE_OUT_OF_SPACE,
+    FAILED_STORAGE,
+    FAILED_AND_OUT_OF_SPACE_STORAGE
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cf6e87f/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
new file mode 100644
index 0000000..3436e77
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java
@@ -0,0 +1,87 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
+
+import java.util.Set;
+
+/**
+ * A Container Report gets processsed by the Node2Container and returns the
+ * Report Result class.
+ */
+public class StorageReportResult {
+  private SCMNodeStorageStatMap.ReportStatus status;
+  private Set<StorageLocationReport> fullVolumes;
+  private Set<StorageLocationReport> failedVolumes;
+
+  StorageReportResult(SCMNodeStorageStatMap.ReportStatus status,
+      Set<StorageLocationReport> fullVolumes,
+      Set<StorageLocationReport> failedVolumes) {
+    this.status = status;
+    this.fullVolumes = fullVolumes;
+    this.failedVolumes = failedVolumes;
+  }
+
+  public SCMNodeStorageStatMap.ReportStatus getStatus() {
+    return status;
+  }
+
+  public Set<StorageLocationReport> getFullVolumes() {
+    return fullVolumes;
+  }
+
+  public Set<StorageLocationReport> getFailedVolumes() {
+    return failedVolumes;
+  }
+
+  static class ReportResultBuilder {
+    private SCMNodeStorageStatMap.ReportStatus status;
+    private Set<StorageLocationReport> fullVolumes;
+    private Set<StorageLocationReport> failedVolumes;
+
+    static ReportResultBuilder newBuilder() {
+      return new ReportResultBuilder();
+    }
+
+    public ReportResultBuilder setStatus(
+        SCMNodeStorageStatMap.ReportStatus newstatus) {
+      this.status = newstatus;
+      return this;
+    }
+
+    public ReportResultBuilder setFullVolumeSet(
+        Set<StorageLocationReport> fullVolumes) {
+      this.fullVolumes = fullVolumes;
+      return this;
+    }
+
+    public ReportResultBuilder setFailedVolumeSet(
+        Set<StorageLocationReport> failedVolumes) {
+      this.failedVolumes = failedVolumes;
+      return this;
+    }
+
+    StorageReportResult build() {
+      return new StorageReportResult(status, fullVolumes, failedVolumes);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cf6e87f/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
index 2fa786b..571de77 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
@@ -17,38 +17,56 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.*;
+import org.junit.Rule;
 import org.junit.rules.ExpectedException;
 
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class TestSCMNodeStorageStatMap {
-  private final static int DATANODE_COUNT = 300;
+  private final static int DATANODE_COUNT = 100;
   final long capacity = 10L * OzoneConsts.GB;
   final long used = 2L * OzoneConsts.GB;
   final long remaining = capacity - used;
   private static OzoneConfiguration conf = new OzoneConfiguration();
-  private final Map<UUID, SCMNodeStat> testData = new ConcurrentHashMap<>();
+  private final Map<UUID, Set<StorageLocationReport>> testData =
+      new ConcurrentHashMap<>();
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   private void generateData() {
-    SCMNodeStat stat = new SCMNodeStat();
-    stat.set(capacity, used, remaining);
     for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) {
-      testData.put(UUID.randomUUID(), stat);
+      UUID dnId = UUID.randomUUID();
+      Set<StorageLocationReport> reportSet = new HashSet<>();
+      String path = GenericTestUtils.getTempPath(
+          TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + Integer
+              .toString(dnIndex));
+      StorageLocationReport.Builder builder = 
StorageLocationReport.newBuilder();
+      builder.setStorageType(StorageType.DISK).setId(dnId.toString())
+          .setStorageLocation(path).setScmUsed(used).setRemaining(remaining)
+          .setCapacity(capacity).setFailed(false);
+      reportSet.add(builder.build());
+      testData.put(UUID.randomUUID(), reportSet);
     }
   }
 
@@ -70,8 +88,8 @@ public class TestSCMNodeStorageStatMap {
     SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
     UUID knownNode = getFirstKey();
     UUID unknownNode = UUID.randomUUID();
-    SCMNodeStat stat = testData.get(knownNode);
-    map.insertNewDatanode(knownNode, stat);
+    Set<StorageLocationReport> report = testData.get(knownNode);
+    map.insertNewDatanode(knownNode, report);
     Assert.assertTrue("Not able to detect a known node",
         map.isKnownDatanode(knownNode));
     Assert.assertFalse("Unknown node detected",
@@ -82,54 +100,89 @@ public class TestSCMNodeStorageStatMap {
   public void testInsertNewDatanode() throws SCMException {
     SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
     UUID knownNode = getFirstKey();
-    SCMNodeStat stat = testData.get(knownNode);
-    map.insertNewDatanode(knownNode, stat);
-    Assert.assertEquals(map.getNodeStat(knownNode).getScmUsed(),
-        testData.get(knownNode).getScmUsed());
+    Set<StorageLocationReport> report = testData.get(knownNode);
+    map.insertNewDatanode(knownNode, report);
+    Assert.assertEquals(map.getStorageVolumes(knownNode),
+        testData.get(knownNode));
     thrown.expect(SCMException.class);
     thrown.expectMessage("already exists");
-    map.insertNewDatanode(knownNode, stat);
+    map.insertNewDatanode(knownNode, report);
   }
 
   @Test
   public void testUpdateUnknownDatanode() throws SCMException {
     SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
     UUID unknownNode = UUID.randomUUID();
-    SCMNodeStat stat = new SCMNodeStat();
-
+    String path = GenericTestUtils.getTempPath(
+        TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + unknownNode
+            .toString());
+    Set<StorageLocationReport> reportSet = new HashSet<>();
+    StorageLocationReport.Builder builder = StorageLocationReport.newBuilder();
+    builder.setStorageType(StorageType.DISK).setId(unknownNode.toString())
+        .setStorageLocation(path).setScmUsed(used).setRemaining(remaining)
+        .setCapacity(capacity).setFailed(false);
+    reportSet.add(builder.build());
     thrown.expect(SCMException.class);
     thrown.expectMessage("No such datanode");
-    map.updateDatanodeMap(unknownNode, stat);
+    map.updateDatanodeMap(unknownNode, reportSet);
   }
 
   @Test
-  public void testProcessNodeReportCheckOneNode() throws SCMException {
+  public void testProcessNodeReportCheckOneNode() throws IOException {
     UUID key = getFirstKey();
-    SCMNodeStat value = testData.get(key);
+    List<SCMStorageReport> reportList = new ArrayList<>();
+    Set<StorageLocationReport> reportSet = testData.get(key);
     SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
-    map.insertNewDatanode(key, value);
+    map.insertNewDatanode(key, reportSet);
     Assert.assertTrue(map.isKnownDatanode(key));
     String storageId = UUID.randomUUID().toString();
     String path =
         GenericTestUtils.getRandomizedTempPath().concat("/" + storageId);
-    long capacity = value.getCapacity().get();
-    long used = value.getScmUsed().get();
-    long remaining = value.getRemaining().get();
+    StorageLocationReport report = reportSet.iterator().next();
+    long capacity = report.getCapacity();
+    long used = report.getScmUsed();
+    long remaining = report.getRemaining();
     List<SCMStorageReport> reports = TestUtils
         .createStorageReport(capacity, used, remaining, path, null, storageId,
             1);
-    SCMNodeStorageStatMap.NodeReportStatus status =
+    StorageReportResult result =
         map.processNodeReport(key, TestUtils.createNodeReport(reports));
-    Assert.assertEquals(status,
-        SCMNodeStorageStatMap.NodeReportStatus.ALL_IS_WELL);
+    Assert.assertEquals(result.getStatus(),
+        SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL);
+    StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
+        SCMNodeReport.newBuilder();
+    SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage();
+    reportList.add(srb);
+    result = map.processNodeReport(key, 
TestUtils.createNodeReport(reportList));
+    Assert.assertEquals(result.getStatus(),
+        SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL);
+
+    reportList.add(TestUtils
+        .createStorageReport(capacity, capacity, 0, path, null,
+            UUID.randomUUID().toString(), 1).get(0));
+    result = map.processNodeReport(key, 
TestUtils.createNodeReport(reportList));
+    Assert.assertEquals(result.getStatus(),
+        SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE);
+    // Mark a disk failed 
+    SCMStorageReport srb2 = SCMStorageReport.newBuilder()
+        .setStorageUuid(UUID.randomUUID().toString())
+        .setStorageLocation(srb.getStorageLocation()).setScmUsed(capacity)
+        .setCapacity(capacity).setRemaining(0).setFailed(true).build();
+    reportList.add(srb2);
+    nrb.addAllStorageReport(reportList);
+    result = map.processNodeReport(key, nrb.addStorageReport(srb).build());
+    Assert.assertEquals(result.getStatus(),
+        SCMNodeStorageStatMap.ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE);
+
   }
 
   @Test
-  public void testProcessNodeReportAndSCMStats() throws SCMException {
+  public void testProcessMultipleNodeReports() throws SCMException {
     SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
     int counter = 1;
     // Insert all testData into the SCMNodeStorageStatMap Map.
-    for (Map.Entry<UUID, SCMNodeStat> keyEntry : testData.entrySet()) {
+    for (Map.Entry<UUID, Set<StorageLocationReport>> keyEntry : testData
+        .entrySet()) {
       map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue());
     }
     Assert.assertEquals(DATANODE_COUNT * capacity, map.getTotalCapacity());
@@ -137,9 +190,21 @@ public class TestSCMNodeStorageStatMap {
     Assert.assertEquals(DATANODE_COUNT * used, map.getTotalSpaceUsed());
 
     // upadate 1/4th of the datanode to be full
-    for (Map.Entry<UUID, SCMNodeStat> keyEntry : testData.entrySet()) {
-      SCMNodeStat stat = new SCMNodeStat(capacity, capacity, 0);
-      map.updateDatanodeMap(keyEntry.getKey(), stat);
+    for (Map.Entry<UUID, Set<StorageLocationReport>> keyEntry : testData
+        .entrySet()) {
+      Set<StorageLocationReport> reportSet = new HashSet<>();
+      String path = GenericTestUtils.getTempPath(
+          TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + keyEntry
+              .getKey().toString());
+      StorageLocationReport.Builder builder =
+          StorageLocationReport.newBuilder();
+      builder.setStorageType(StorageType.DISK)
+          .setId(keyEntry.getKey().toString()).setStorageLocation(path)
+          .setScmUsed(capacity).setRemaining(0).setCapacity(capacity)
+          .setFailed(false);
+      reportSet.add(builder.build());
+
+      map.updateDatanodeMap(keyEntry.getKey(), reportSet);
       counter++;
       if (counter > DATANODE_COUNT / 4) {
         break;
@@ -163,7 +228,8 @@ public class TestSCMNodeStorageStatMap {
         map.getTotalSpaceUsed(), 0);
     counter = 1;
     // Remove 1/4 of the DataNodes from the Map
-    for (Map.Entry<UUID, SCMNodeStat> keyEntry : testData.entrySet()) {
+    for (Map.Entry<UUID, Set<StorageLocationReport>> keyEntry : testData
+        .entrySet()) {
       map.removeDatanode(keyEntry.getKey());
       counter++;
       if (counter > DATANODE_COUNT / 4) {
@@ -181,12 +247,13 @@ public class TestSCMNodeStorageStatMap {
         map.getDatanodeList(SCMNodeStorageStatMap.UtilizationThreshold.NORMAL)
             .size(), 0);
 
-    Assert.assertEquals(0.75 * DATANODE_COUNT * capacity, 
map.getTotalCapacity(), 0);
+    Assert
+        .assertEquals(0.75 * DATANODE_COUNT * capacity, map.getTotalCapacity(),
+            0);
     Assert.assertEquals(0.75 * DATANODE_COUNT * remaining,
         map.getTotalFreeSpace(), 0);
-    Assert.assertEquals(
-        0.75 * DATANODE_COUNT * used ,
-        map.getTotalSpaceUsed(), 0);
+    Assert
+        .assertEquals(0.75 * DATANODE_COUNT * used, map.getTotalSpaceUsed(), 
0);
 
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to