Repository: hadoop
Updated Branches:
  refs/heads/branch-2 438901f6c -> 4cdda4ba8


HDFS-11545. Propagate DataNode's slow disks info to the NameNode via Heartbeat. 
Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cdda4ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cdda4ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cdda4ba

Branch: refs/heads/branch-2
Commit: 4cdda4ba87f24a172fd1b4395d59643ea5554df0
Parents: 438901f
Author: Hanisha Koneru <hanishakon...@apache.org>
Authored: Wed Apr 5 07:35:09 2017 -0700
Committer: Arpit Agarwal <a...@apache.org>
Committed: Wed Apr 5 07:35:09 2017 -0700

----------------------------------------------------------------------
 .../hdfs/server/protocol/SlowDiskReports.java   | 126 +++++++++++++++++++
 .../DatanodeProtocolClientSideTranslatorPB.java |   7 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   3 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  68 ++++++++++
 .../server/blockmanagement/DatanodeManager.java |   3 +-
 .../server/blockmanagement/SlowPeerTracker.java |   2 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  32 +++--
 .../datanode/metrics/DataNodeDiskMetrics.java   |  35 +++---
 .../hdfs/server/namenode/FSNamesystem.java      |   6 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   6 +-
 .../hdfs/server/protocol/DatanodeProtocol.java  |   3 +-
 .../src/main/proto/DatanodeProtocol.proto       |  15 +++
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |  28 +++++
 .../TestNameNodePrunesMissingStorages.java      |   4 +-
 .../datanode/InternalDataNodeTestUtils.java     |   4 +-
 .../server/datanode/TestBPOfferService.java     |   4 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   4 +-
 .../datanode/TestBpServiceActorScheduler.java   |  20 +--
 .../server/datanode/TestDataNodeLifeline.java   |   8 +-
 .../server/datanode/TestDataNodeMXBean.java     |   2 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 .../server/datanode/TestFsDatasetCache.java     |   4 +-
 .../hdfs/server/datanode/TestStorageReport.java |   4 +-
 .../server/namenode/NNThroughputBenchmark.java  |   7 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   3 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 26 files changed, 340 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java
new file mode 100644
index 0000000..d548eeb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * A class that allows a DataNode to communicate information about all
+ * its disks that appear to be slow.
+ *
+ * The wire representation of this structure is a list of
+ * SlowDiskReportProto messages.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class SlowDiskReports {
+  /**
+   * A map from the DataNode Disk's BasePath to its mean metadata op latency,
+   * mean read io latency and mean write io latency.
+   *
+   * The NameNode must not attempt to interpret the mean latencies
+   * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing
+   * latencies across reports from different DataNodes may not be not
+   * meaningful and must be avoided.
+   */
+  @Nonnull
+  private final Map<String, Map<DiskOp, Double>> slowDisks;
+
+  /**
+   * An object representing a SlowPeerReports with no entries. Should
+   * be used instead of null or creating new objects when there are
+   * no slow peers to report.
+   */
+  public static final SlowDiskReports EMPTY_REPORT =
+      new SlowDiskReports(ImmutableMap.<String, Map<DiskOp, Double>>of());
+
+  private SlowDiskReports(Map<String, Map<DiskOp, Double>> slowDisks) {
+    this.slowDisks = slowDisks;
+  }
+
+  public static SlowDiskReports create(
+      @Nullable Map<String, Map<DiskOp, Double>> slowDisks) {
+    if (slowDisks == null || slowDisks.isEmpty()) {
+      return EMPTY_REPORT;
+    }
+    return new SlowDiskReports(slowDisks);
+  }
+
+  public Map<String, Map<DiskOp, Double>> getSlowDisks() {
+    return slowDisks;
+  }
+
+  public boolean haveSlowDisks() {
+    return slowDisks.size() > 0;
+  }
+
+  /**
+   * Return true if the two objects represent the same set slow disk
+   * entries. Primarily for unit testing convenience.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof SlowDiskReports)) {
+      return false;
+    }
+
+    SlowDiskReports that = (SlowDiskReports) o;
+
+    if (this.slowDisks.size() != that.slowDisks.size()) {
+      return false;
+    }
+
+    if (!this.slowDisks.keySet().containsAll(that.slowDisks.keySet()) ||
+        !that.slowDisks.keySet().containsAll(this.slowDisks.keySet())) {
+      return false;
+    }
+
+    boolean areEqual;
+    for (String disk : this.slowDisks.keySet()) {
+      if (!this.slowDisks.get(disk).equals(that.slowDisks.get(disk))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return slowDisks.hashCode();
+  }
+
+  /**
+   * Lists the types of operations on which disk latencies are measured.
+   */
+  public enum DiskOp {
+    METADATA,
+    READ,
+    WRITE
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 1794cbd..fe8fff7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -55,6 +55,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -136,7 +137,8 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       int xmitsInProgress, int xceiverCount, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      @Nonnull SlowDiskReports slowDisks) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -156,6 +158,9 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
     if (slowPeers.haveSlowPeers()) {
       builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
     }
+    if (slowDisks.haveSlowDisks()) {
+      builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
+    }
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index cdfcf4c..f3528bb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -121,7 +121,8 @@ public class DatanodeProtocolServerSideTranslatorPB 
implements
           request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
-          PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
+          PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
+          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 5ee7847..dbdbfc4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -46,6 +46,8 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos
+    .SlowDiskReportProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
@@ -104,6 +106,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 
@@ -832,6 +835,71 @@ public class PBHelper {
     return SlowPeerReports.create(slowPeersMap);
   }
 
+  public static List<SlowDiskReportProto> convertSlowDiskInfo(
+      SlowDiskReports slowDisks) {
+    if (slowDisks.getSlowDisks().size() == 0) {
+      return Collections.emptyList();
+    }
+
+    List<SlowDiskReportProto> slowDiskInfoProtos =
+        new ArrayList<>(slowDisks.getSlowDisks().size());
+    for (Map.Entry<String, Map<SlowDiskReports.DiskOp, Double>> entry :
+        slowDisks.getSlowDisks().entrySet()) {
+      SlowDiskReportProto.Builder builder = SlowDiskReportProto.newBuilder();
+      builder.setBasePath(entry.getKey());
+      Map<SlowDiskReports.DiskOp, Double> value = entry.getValue();
+      if (value.get(SlowDiskReports.DiskOp.METADATA) != null) {
+        builder.setMeanMetadataOpLatency(value.get(
+            SlowDiskReports.DiskOp.METADATA));
+      }
+      if (value.get(SlowDiskReports.DiskOp.READ) != null) {
+        builder.setMeanReadIoLatency(value.get(
+            SlowDiskReports.DiskOp.READ));
+      }
+      if (value.get(SlowDiskReports.DiskOp.WRITE) != null) {
+        builder.setMeanWriteIoLatency(value.get(
+            SlowDiskReports.DiskOp.WRITE));
+      }
+      slowDiskInfoProtos.add(builder.build());
+    }
+
+    return slowDiskInfoProtos;
+  }
+
+  public static SlowDiskReports convertSlowDiskInfo(
+      List<SlowDiskReportProto> slowDiskProtos) {
+
+    // No slow disks, or possibly an older DataNode.
+    if (slowDiskProtos == null || slowDiskProtos.size() == 0) {
+      return SlowDiskReports.EMPTY_REPORT;
+    }
+
+    Map<String, Map<SlowDiskReports.DiskOp, Double>> slowDisksMap =
+        new HashMap<>(slowDiskProtos.size());
+    for (SlowDiskReportProto proto : slowDiskProtos) {
+      if (!proto.hasBasePath()) {
+        // The disk basePath should be reported.
+        continue;
+      }
+      Map<SlowDiskReports.DiskOp, Double> latencyMap = new HashMap<>();
+      if (proto.hasMeanMetadataOpLatency()) {
+        latencyMap.put(SlowDiskReports.DiskOp.METADATA,
+            proto.getMeanMetadataOpLatency());
+      }
+      if (proto.hasMeanReadIoLatency()) {
+        latencyMap.put(SlowDiskReports.DiskOp.READ,
+            proto.getMeanReadIoLatency());
+      }
+      if (proto.hasMeanWriteIoLatency()) {
+        latencyMap.put(SlowDiskReports.DiskOp.WRITE,
+            proto.getMeanWriteIoLatency());
+      }
+
+      slowDisksMap.put(proto.getBasePath(), latencyMap);
+    }
+    return SlowDiskReports.create(slowDisksMap);
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 7f8afa9..04ba5aa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1508,7 +1508,8 @@ public class DatanodeManager {
       long cacheCapacity, long cacheUsed, int xceiverCount, 
       int maxTransfers, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      @Nonnull SlowDiskReports slowDisks) throws IOException {
     final DatanodeDescriptor nodeinfo;
     try {
       nodeinfo = getDatanode(nodeReg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
index c8a6348..98d0927 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -58,7 +58,7 @@ public class SlowPeerTracker {
 
   /**
    * Time duration after which a report is considered stale. This is
-   * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e.
+   * set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e.
    * maintained for at least two successive reports.
    */
   private final long reportValidityMs;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index ec8c79b..235d30f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -499,11 +500,15 @@ class BPServiceActor implements Runnable {
         .getVolumeFailureSummary();
     int numFailedVolumes = volumeFailureSummary != null ?
         volumeFailureSummary.getFailedStorageLocations().length : 0;
-    final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
+    final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
     final SlowPeerReports slowPeers =
-        slowPeersReportDue && dn.getPeerMetrics() != null ?
+        outliersReportDue && dn.getPeerMetrics() != null ?
             SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
             SlowPeerReports.EMPTY_REPORT;
+    final SlowDiskReports slowDisks =
+        outliersReportDue && dn.getDiskMetrics() != null ?
+            SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) 
:
+            SlowDiskReports.EMPTY_REPORT;
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -513,11 +518,12 @@ class BPServiceActor implements Runnable {
         numFailedVolumes,
         volumeFailureSummary,
         requestBlockReportLease,
-        slowPeers);
+        slowPeers,
+        slowDisks);
 
-    if (slowPeersReportDue) {
+    if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
-      scheduler.scheduleNextSlowPeerReport();
+      scheduler.scheduleNextOutlierReport();
     }
     return response;
   }
@@ -1097,7 +1103,7 @@ class BPServiceActor implements Runnable {
     boolean resetBlockReportTime = true;
 
     @VisibleForTesting
-    volatile long nextSlowPeersReportTime = monotonicNow();
+    volatile long nextOutliersReportTime = monotonicNow();
 
     private final AtomicBoolean forceFullBlockReport =
         new AtomicBoolean(false);
@@ -1105,14 +1111,14 @@ class BPServiceActor implements Runnable {
     private final long heartbeatIntervalMs;
     private final long lifelineIntervalMs;
     private final long blockReportIntervalMs;
-    private final long slowPeersReportIntervalMs;
+    private final long outliersReportIntervalMs;
 
     Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
-              long blockReportIntervalMs, long slowPeersReportIntervalMs) {
+              long blockReportIntervalMs, long outliersReportIntervalMs) {
       this.heartbeatIntervalMs = heartbeatIntervalMs;
       this.lifelineIntervalMs = lifelineIntervalMs;
       this.blockReportIntervalMs = blockReportIntervalMs;
-      this.slowPeersReportIntervalMs = slowPeersReportIntervalMs;
+      this.outliersReportIntervalMs = outliersReportIntervalMs;
       scheduleNextLifeline(nextHeartbeatTime);
     }
 
@@ -1145,8 +1151,8 @@ class BPServiceActor implements Runnable {
       lastBlockReportTime = blockReportTime;
     }
 
-    void scheduleNextSlowPeerReport() {
-      nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs;
+    void scheduleNextOutlierReport() {
+      nextOutliersReportTime = monotonicNow() + outliersReportIntervalMs;
     }
 
     long getLastHearbeatTime() {
@@ -1175,8 +1181,8 @@ class BPServiceActor implements Runnable {
       return nextBlockReportTime - curTime <= 0;
     }
 
-    boolean isSlowPeersReportDue(long curTime) {
-      return nextSlowPeersReportTime - curTime <= 0;
+    boolean isOutliersReportDue(long curTime) {
+      return nextOutliersReportTime - curTime <= 0;
     }
 
     void forceFullBlockReportNow() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
index b6d5cf0..a543c69 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class DataNodeDiskMetrics {
   private volatile boolean shouldRun;
   private OutlierDetector slowDiskDetector;
   private Daemon slowDiskDetectionDaemon;
-  private volatile Map<String, Map<DiskOutlierDetectionOp, Double>>
+  private volatile Map<String, Map<DiskOp, Double>>
       diskOutliersStats = Maps.newHashMap();
 
   public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) 
{
@@ -144,13 +145,13 @@ public class DataNodeDiskMetrics {
       diskOutliersSet.addAll(writeIoOutliers.keySet());
     }
 
-    Map<String, Map<DiskOutlierDetectionOp, Double>> diskStats =
+    Map<String, Map<DiskOp, Double>> diskStats =
         Maps.newHashMap();
     for (String disk : diskOutliersSet) {
-      Map<DiskOutlierDetectionOp, Double> diskStat = Maps.newHashMap();
-      diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk));
-      diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk));
-      diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk));
+      Map<DiskOp, Double> diskStat = Maps.newHashMap();
+      diskStat.put(DiskOp.METADATA, metadataOpStats.get(disk));
+      diskStat.put(DiskOp.READ, readIoStats.get(disk));
+      diskStat.put(DiskOp.WRITE, writeIoStats.get(disk));
       diskStats.put(disk, diskStat);
     }
 
@@ -158,17 +159,7 @@ public class DataNodeDiskMetrics {
     LOG.debug("Updated disk outliers.");
   }
 
-  /**
-   * Lists the types of operations on which disk latencies are measured.
-   */
-  public enum DiskOutlierDetectionOp {
-    METADATA,
-    READ,
-    WRITE
-  }
-
-  public Map<String,
-      Map<DiskOutlierDetectionOp, Double>> getDiskOutliersStats() {
+  public Map<String, Map<DiskOp, Double>> getDiskOutliersStats() {
     return diskOutliersStats;
   }
 
@@ -186,8 +177,12 @@ public class DataNodeDiskMetrics {
    * Use only for testing.
    */
   @VisibleForTesting
-  public void addSlowDiskForTesting(String slowDiskPath) {
-    diskOutliersStats.put(slowDiskPath,
-        ImmutableMap.<DiskOutlierDetectionOp, Double>of());
+  public void addSlowDiskForTesting(String slowDiskPath,
+      Map<DiskOp, Double> latencies) {
+    if (latencies == null) {
+      diskOutliersStats.put(slowDiskPath, ImmutableMap.<DiskOp, Double>of());
+    } else {
+      diskOutliersStats.put(slowDiskPath, latencies);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 54907bd..c22a70a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -91,6 +91,7 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.monotonicNow;
 import static 
org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME;
@@ -3625,7 +3626,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       int xceiverCount, int xmitsInProgress, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      @Nonnull SlowDiskReports slowDisks) throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3634,7 +3636,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       DatanodeCommand[] cmds = 
blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
           xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
-          slowPeers);
+          slowPeers, slowDisks);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 59a741e..c8c784e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -152,6 +152,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -1416,13 +1417,14 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
       int xmitsInProgress, int xceiverCount,
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      @Nonnull SlowDiskReports slowDisks) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers);
+        slowPeers, slowDisks);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index a15aea8..a8e3edd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -121,7 +121,8 @@ public interface DatanodeProtocol {
                                        int failedVolumes,
                                        VolumeFailureSummary 
volumeFailureSummary,
                                        boolean requestFullBlockReportLease,
-                                       @Nonnull SlowPeerReports slowPeers)
+                                       @Nonnull SlowPeerReports slowPeers,
+                                       @Nonnull SlowDiskReports slowDisks)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 393cc43..0e4b2fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -186,6 +186,7 @@ message VolumeFailureSummaryProto {
  * cacheUsed - amount of cache used
  * volumeFailureSummary - info about volume failures
  * slowPeers - info about peer DataNodes that are suspected to be slow.
+ * slowDisks - info about DataNode disks that are suspected to be slow.
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -198,6 +199,7 @@ message HeartbeatRequestProto {
   optional VolumeFailureSummaryProto volumeFailureSummary = 8;
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
+  repeated SlowDiskReportProto slowDisks = 11;
 }
 
 /**
@@ -393,6 +395,19 @@ message SlowPeerReportProto {
 }
 
 /**
+ * Information about a single slow disk that may be reported by
+ * the DataNode to the NameNode as part of the heartbeat request.
+ * The message includes the disk's basePath, mean metadata op latency,
+ * mean read io latency and mean write io latency as observed by the DataNode.
+ */
+message SlowDiskReportProto {
+  optional string basePath = 1;
+  optional double meanMetadataOpLatency = 2;
+  optional double meanReadIoLatency = 3;
+  optional double meanWriteIoLatency = 4;
+}
+
+/**
  * Protocol used from datanode to the namenode
  * See the request and response for details of rpc call.
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 3280563..9333efc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
@@ -26,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -700,4 +702,30 @@ public class TestPBHelper {
         "Expected empty map:" + ", got map:" + slowPeersConverted2,
         slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
   }
+
+  @Test
+  public void testSlowDiskInfoPBHelper() {
+    // Test with a map that has a few slow disk entries.
+    final SlowDiskReports slowDisks = SlowDiskReports.create(
+        ImmutableMap.<String, Map<SlowDiskReports.DiskOp, Double>>of(
+            "disk1", ImmutableMap.of(SlowDiskReports.DiskOp.METADATA, 0.5),
+            "disk2", ImmutableMap.of(SlowDiskReports.DiskOp.READ, 1.0,
+                SlowDiskReports.DiskOp.WRITE, 1.0),
+            "disk3", ImmutableMap.of(SlowDiskReports.DiskOp.METADATA, 1.2,
+                SlowDiskReports.DiskOp.READ, 1.5,
+                SlowDiskReports.DiskOp.WRITE, 1.3)));
+    SlowDiskReports slowDisksConverted1 = PBHelper.convertSlowDiskInfo(
+        PBHelper.convertSlowDiskInfo(slowDisks));
+    assertTrue(
+        "Expected map:" + slowDisks + ", got map:" +
+            slowDisksConverted1.getSlowDisks(),
+        slowDisksConverted1.equals(slowDisks));
+
+    // Test with an empty map
+    SlowDiskReports slowDisksConverted2 = PBHelper.convertSlowDiskInfo(
+        PBHelper.convertSlowDiskInfo(SlowDiskReports.EMPTY_REPORT));
+    assertTrue(
+        "Expected empty map:" + ", got map:" + slowDisksConverted2,
+        slowDisksConverted2.equals(SlowDiskReports.EMPTY_REPORT));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index fbfccbe..f234bcc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -113,7 +114,8 @@ public class TestNameNodePrunesMissingStorages {
       // Stop the DataNode and send fake heartbeat with missing storage.
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 
0,
-          0, null, true, SlowPeerReports.EMPTY_REPORT);
+          0, null, true, SlowPeerReports.EMPTY_REPORT,
+          SlowDiskReports.EMPTY_REPORT);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, 
is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index cf43fd0..876a854 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -138,7 +139,8 @@ public class InternalDataNodeTestUtils {
             Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
-            Mockito.any(SlowPeerReports.class))).thenReturn(
+            Mockito.any(SlowPeerReports.class),
+            Mockito.any(SlowDiskReports.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index c6b38ee..b9220e0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
@@ -154,7 +155,8 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
-          Mockito.any(SlowPeerReports.class));
+          Mockito.any(SlowPeerReports.class),
+          Mockito.any(SlowDiskReports.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index fecddc5..ab1ad48 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -189,7 +190,8 @@ public class TestBlockRecovery {
             Mockito.anyInt(),
             Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
-            Mockito.any(SlowPeerReports.class)))
+            Mockito.any(SlowPeerReports.class),
+            Mockito.any(SlowDiskReports.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index 6435d4d..753c3a8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -51,7 +51,7 @@ public class TestBpServiceActorScheduler {
   private static final long HEARTBEAT_INTERVAL_MS = 5000;      // 5 seconds
   private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
   private static final long BLOCK_REPORT_INTERVAL_MS = 10000;  // 10 seconds
-  private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000;  // 10 
seconds
+  private static final long OUTLIER_REPORT_INTERVAL_MS = 10000;  // 10 seconds
   private final Random random = new Random(System.nanoTime());
 
   @Test
@@ -182,15 +182,15 @@ public class TestBpServiceActorScheduler {
   }
 
   @Test
-  public void testSlowPeerReportScheduling() {
+  public void testOutlierReportScheduling() {
     for (final long now : getTimestamps()) {
       Scheduler scheduler = makeMockScheduler(now);
-      assertTrue(scheduler.isSlowPeersReportDue(now));
-      scheduler.scheduleNextSlowPeerReport();
-      assertFalse(scheduler.isSlowPeersReportDue(now));
-      assertFalse(scheduler.isSlowPeersReportDue(now + 1));
-      assertTrue(scheduler.isSlowPeersReportDue(
-          now + SLOW_PEER_REPORT_INTERVAL_MS));
+      assertTrue(scheduler.isOutliersReportDue(now));
+      scheduler.scheduleNextOutlierReport();
+      assertFalse(scheduler.isOutliersReportDue(now));
+      assertFalse(scheduler.isOutliersReportDue(now + 1));
+      assertTrue(scheduler.isOutliersReportDue(
+          now + OUTLIER_REPORT_INTERVAL_MS));
     }
   }
 
@@ -198,11 +198,11 @@ public class TestBpServiceActorScheduler {
     LOG.info("Using now = " + now);
     Scheduler mockScheduler = spy(new Scheduler(
         HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
-        BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
+        BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
     doReturn(now).when(mockScheduler).monotonicNow();
     mockScheduler.nextBlockReportTime = now;
     mockScheduler.nextHeartbeatTime = now;
-    mockScheduler.nextSlowPeersReportTime = now;
+    mockScheduler.nextOutliersReportTime = now;
     return mockScheduler;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 8a9f0b8..28427bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -23,6 +23,8 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
+
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -169,7 +171,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             any(VolumeFailureSummary.class),
             anyBoolean(),
-            any(SlowPeerReports.class));
+            any(SlowPeerReports.class),
+            any(SlowDiskReports.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -233,7 +236,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             any(VolumeFailureSummary.class),
             anyBoolean(),
-            any(SlowPeerReports.class));
+            any(SlowPeerReports.class),
+            any(SlowDiskReports.class));
 
     // While waiting on the latch for the expected number of heartbeat 
messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
index 89a96be..1fcde6e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java
@@ -221,7 +221,7 @@ public class TestDataNodeMXBean {
       Assert.assertEquals(datanodes.size(), 1);
       DataNode datanode = datanodes.get(0);
       String slowDiskPath = "test/data1/slowVolume";
-      datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath);
+      datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath, null);
 
       MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
       ObjectName mxbeanName = new ObjectName(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index c94f74e..bb1d9ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -220,7 +221,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyInt(),
            Mockito.any(VolumeFailureSummary.class),
            Mockito.anyBoolean(),
-           Mockito.any(SlowPeerReports.class));
+           Mockito.any(SlowPeerReports.class),
+           Mockito.any(SlowDiskReports.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index eb015c0..28bf13b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -173,7 +174,8 @@ public class TestFsDatasetCache {
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
         anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
-        anyBoolean(), any(SlowPeerReports.class));
+        anyBoolean(), any(SlowPeerReports.class),
+        any(SlowDiskReports.class));
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 2b793e9..5f62ddb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -108,7 +109,8 @@ public class TestStorageReport {
         captor.capture(),
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
-        Mockito.any(SlowPeerReports.class));
+        Mockito.any(SlowPeerReports.class),
+        Mockito.any(SlowDiskReports.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 76ccd40..b65f111 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -64,6 +64,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -953,7 +954,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT).getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+          .getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1003,7 +1005,8 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT).getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+          .getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 0810276..ae55366 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
@@ -119,7 +120,7 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cdda4ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index a72b2a2..033acf2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -133,7 +134,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT).getCommands();
+            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+            .getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to