HDDS-163. Add Datanode heartbeat dispatcher in SCM.
Contributed by Nandakumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ddd09d59
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ddd09d59
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ddd09d59

Branch: refs/heads/HDDS-48
Commit: ddd09d59f3d9825f068026622720914e04c2e1d6
Parents: 7547740
Author: Anu Engineer <aengin...@apache.org>
Authored: Wed Jun 13 20:18:22 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Wed Jun 13 20:18:22 2018 -0700

----------------------------------------------------------------------
 .../container/common/report/ReportManager.java  |   3 +-
 .../scm/server/SCMDatanodeProtocolServer.java   | 119 +++++-------
 .../SCMDatanodeContainerReportHandler.java      |  76 ++++++++
 .../report/SCMDatanodeHeartbeatDispatcher.java  | 189 +++++++++++++++++++
 .../report/SCMDatanodeNodeReportHandler.java    |  43 +++++
 .../server/report/SCMDatanodeReportHandler.java |  83 ++++++++
 .../report/SCMDatanodeReportHandlerFactory.java |  82 ++++++++
 .../hdds/scm/server/report/package-info.java    |  57 ++++++
 .../TestSCMDatanodeContainerReportHandler.java  |  34 ++++
 .../TestSCMDatanodeHeartbeatDispatcher.java     | 138 ++++++++++++++
 .../TestSCMDatanodeNodeReportHandler.java       |  36 ++++
 .../TestSCMDatanodeReportHandlerFactory.java    |  51 +++++
 .../hdds/scm/server/report/package-info.java    |  21 +++
 .../apache/hadoop/ozone/scm/TestSCMMetrics.java |  20 +-
 14 files changed, 875 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
index c09282e..8097cd6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
@@ -50,7 +50,8 @@ public final class ReportManager {
                         List<ReportPublisher> publishers) {
     this.context = context;
     this.publishers = publishers;
-    this.executorService = HadoopExecutors.newScheduledThreadPool(1,
+    this.executorService = HadoopExecutors.newScheduledThreadPool(
+        publishers.size(),
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("Datanode ReportManager Thread - %d").build());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 1b1645d..7d16161 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -69,7 +69,7 @@ import static org.apache.hadoop.hdds.protocol.proto
 
 
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.hdds.scm.server.report.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -114,6 +114,7 @@ public class SCMDatanodeProtocolServer implements
 
   private final StorageContainerManager scm;
   private final InetSocketAddress datanodeRpcAddress;
+  private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
 
   public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
       StorageContainerManager scm)  throws IOException {
@@ -148,14 +149,22 @@ public class SCMDatanodeProtocolServer implements
         updateRPCListenAddress(
             conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
             datanodeRpcServer);
+
+    heartbeatDispatcher = SCMDatanodeHeartbeatDispatcher.newBuilder(conf, scm)
+        .addHandlerFor(NodeReportProto.class)
+        .addHandlerFor(ContainerReportsProto.class)
+        .build();
   }
 
-  public InetSocketAddress getDatanodeRpcAddress() {
-    return datanodeRpcAddress;
+  public void start() {
+    LOG.info(
+        StorageContainerManager.buildRpcServerStartMessage(
+            "RPC server for DataNodes", datanodeRpcAddress));
+    datanodeRpcServer.start();
   }
 
-  public RPC.Server getDatanodeRpcServer() {
-    return datanodeRpcServer;
+  public InetSocketAddress getDatanodeRpcAddress() {
+    return datanodeRpcAddress;
   }
 
   @Override
@@ -167,25 +176,6 @@ public class SCMDatanodeProtocolServer implements
   }
 
   @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(
-      SCMHeartbeatRequestProto heartbeat)
-      throws IOException {
-    // TODO: Add a heartbeat dispatcher.
-    DatanodeDetails datanodeDetails = DatanodeDetails
-        .getFromProtoBuf(heartbeat.getDatanodeDetails());
-    NodeReportProto nodeReport = heartbeat.getNodeReport();
-    List<SCMCommand> commands =
-        scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
-    List<SCMCommandProto> cmdResponses = new LinkedList<>();
-    for (SCMCommand cmd : commands) {
-      cmdResponses.add(getCommandResponse(cmd));
-    }
-    return SCMHeartbeatResponseProto.newBuilder()
-        .setDatanodeUUID(datanodeDetails.getUuidString())
-        .addAllCommands(cmdResponses).build();
-  }
-
-  @Override
   public SCMRegisteredResponseProto register(
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
       NodeReportProto nodeReport,
@@ -216,36 +206,27 @@ public class SCMDatanodeProtocolServer implements
         .build();
   }
 
-  public void processContainerReports(DatanodeDetails datanodeDetails,
-                                      ContainerReportsProto reports)
+  @Override
+  public SCMHeartbeatResponseProto sendHeartbeat(
+      SCMHeartbeatRequestProto heartbeat)
       throws IOException {
-    updateContainerReportMetrics(datanodeDetails, reports);
-    // should we process container reports async?
-    scm.getScmContainerManager()
-        .processContainerReports(datanodeDetails, reports);
-  }
+    heartbeatDispatcher.dispatch(heartbeat);
 
-  private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
-                                            ContainerReportsProto reports) {
-    ContainerStat newStat = new ContainerStat();
-    for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
-        .getReportsList()) {
-      newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
-          info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
-          info.getReadCount(), info.getWriteCount()));
+    // TODO: Remove the below code after SCM refactoring.
+    DatanodeDetails datanodeDetails = DatanodeDetails
+        .getFromProtoBuf(heartbeat.getDatanodeDetails());
+    NodeReportProto nodeReport = heartbeat.getNodeReport();
+    List<SCMCommand> commands =
+        scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
+    List<SCMCommandProto> cmdResponses = new LinkedList<>();
+    for (SCMCommand cmd : commands) {
+      cmdResponses.add(getCommandResponse(cmd));
     }
-    // update container metrics
-    StorageContainerManager.getMetrics().setLastContainerStat(newStat);
-
-    // Update container stat entry, this will trigger a removal operation if it
-    // exists in cache.
-    String datanodeUuid = datanodeDetails.getUuidString();
-    scm.getContainerReportCache().put(datanodeUuid, newStat);
-    // update global view container metrics
-    StorageContainerManager.getMetrics().incrContainerStat(newStat);
+    return SCMHeartbeatResponseProto.newBuilder()
+        .setDatanodeUUID(datanodeDetails.getUuidString())
+        .addAllCommands(cmdResponses).build();
   }
 
-
   @Override
   public ContainerBlocksDeletionACKResponseProto 
sendContainerBlocksDeletionACK(
       ContainerBlocksDeletionACKProto acks) throws IOException {
@@ -271,28 +252,6 @@ public class SCMDatanodeProtocolServer implements
         .getDefaultInstanceForType();
   }
 
-  public void start() {
-    LOG.info(
-        StorageContainerManager.buildRpcServerStartMessage(
-            "RPC server for DataNodes", getDatanodeRpcAddress()));
-    getDatanodeRpcServer().start();
-  }
-
-  public void stop() {
-    try {
-      LOG.info("Stopping the RPC server for DataNodes");
-      datanodeRpcServer.stop();
-    } catch (Exception ex) {
-      LOG.error(" datanodeRpcServer stop failed.", ex);
-    }
-    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
-  }
-
-  public void join() throws InterruptedException {
-    LOG.trace("Join RPC server for DataNodes");
-    datanodeRpcServer.join();
-  }
-
   /**
    * Returns a SCMCommandRepose from the SCM Command.
    *
@@ -338,4 +297,22 @@ public class SCMDatanodeProtocolServer implements
       throw new IllegalArgumentException("Not implemented");
     }
   }
+
+
+  public void join() throws InterruptedException {
+    LOG.trace("Join RPC server for DataNodes");
+    datanodeRpcServer.join();
+  }
+
+  public void stop() {
+    try {
+      LOG.info("Stopping the RPC server for DataNodes");
+      datanodeRpcServer.stop();
+      heartbeatDispatcher.shutdown();
+    } catch (Exception ex) {
+      LOG.error(" datanodeRpcServer stop failed.", ex);
+    }
+    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java
new file mode 100644
index 0000000..00ce94d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handler for Datanode Container Report.
+ */
+public class SCMDatanodeContainerReportHandler extends
+    SCMDatanodeReportHandler<ContainerReportsProto> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SCMDatanodeContainerReportHandler.class);
+
+  @Override
+  public void processReport(DatanodeDetails datanodeDetails,
+                            ContainerReportsProto report) throws IOException {
+    LOG.trace("Processing container report from {}.", datanodeDetails);
+    updateContainerReportMetrics(datanodeDetails, report);
+    getSCM().getScmContainerManager()
+        .processContainerReports(datanodeDetails, report);
+  }
+
+  /**
+   * Updates container report metrics in SCM.
+   *
+   * @param datanodeDetails Datanode Information
+   * @param reports Container Reports
+   */
+  private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
+                                            ContainerReportsProto reports) {
+    ContainerStat newStat = new ContainerStat();
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
+        .getReportsList()) {
+      newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
+          info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
+          info.getReadCount(), info.getWriteCount()));
+    }
+    // update container metrics
+    StorageContainerManager.getMetrics().setLastContainerStat(newStat);
+
+    // Update container stat entry, this will trigger a removal operation if it
+    // exists in cache.
+    String datanodeUuid = datanodeDetails.getUuidString();
+    getSCM().getContainerReportCache().put(datanodeUuid, newStat);
+    // update global view container metrics
+    StorageContainerManager.getMetrics().incrContainerStat(newStat);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java
new file mode 100644
index 0000000..d50edff
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * This class is responsible for dispatching heartbeat from datanode to
+ * appropriate ReportHandlers at SCM.
+ * Only one handler per report is supported now, it's very easy to support
+ * multiple handlers for a report.
+ */
+public final class SCMDatanodeHeartbeatDispatcher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SCMDatanodeHeartbeatDispatcher.class);
+
+  /**
+   * This stores Report to Handler mapping.
+   */
+  private final Map<Class<? extends GeneratedMessage>,
+      SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers;
+
+  /**
+   * Executor service which will be used for processing reports.
+   */
+  private final ExecutorService executorService;
+
+  /**
+   * Constructs SCMDatanodeHeartbeatDispatcher instance with the given
+   * handlers.
+   *
+   * @param handlers report to report handler mapping
+   */
+  private SCMDatanodeHeartbeatDispatcher(Map<Class<? extends GeneratedMessage>,
+      SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers) {
+    this.handlers = handlers;
+    this.executorService = HadoopExecutors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("SCMDatanode Heartbeat Dispatcher Thread - %d")
+            .build());
+  }
+
+  /**
+   * Dispatches heartbeat to registered handlers.
+   *
+   * @param heartbeat heartbeat to be dispatched.
+   */
+  public void dispatch(SCMHeartbeatRequestProto heartbeat) {
+    DatanodeDetails datanodeDetails = DatanodeDetails
+        .getFromProtoBuf(heartbeat.getDatanodeDetails());
+    if (heartbeat.hasNodeReport()) {
+      processReport(datanodeDetails, heartbeat.getNodeReport());
+    }
+    if (heartbeat.hasContainerReport()) {
+      processReport(datanodeDetails, heartbeat.getContainerReport());
+    }
+  }
+
+  /**
+   * Invokes appropriate ReportHandler and submits the task to executor
+   * service for processing.
+   *
+   * @param datanodeDetails Datanode Information
+   * @param report Report to be processed
+   */
+  @SuppressWarnings("unchecked")
+  private void processReport(DatanodeDetails datanodeDetails,
+                             GeneratedMessage report) {
+    executorService.submit(() -> {
+      try {
+        SCMDatanodeReportHandler handler = handlers.get(report.getClass());
+        handler.processReport(datanodeDetails, report);
+      } catch (IOException ex) {
+        LOG.error("Exception wile processing report {}, from {}",
+            report.getClass(), datanodeDetails, ex);
+      }
+    });
+  }
+
+  /**
+   * Shuts down SCMDatanodeHeartbeatDispatcher.
+   */
+  public void shutdown() {
+    executorService.shutdown();
+  }
+
+  /**
+   * Returns a new Builder to construct {@link SCMDatanodeHeartbeatDispatcher}.
+   *
+   * @param conf Configuration to be used by SCMDatanodeHeartbeatDispatcher
+   * @param scm {@link StorageContainerManager} instance to be used by report
+   *            handlers
+   *
+   * @return {@link SCMDatanodeHeartbeatDispatcher.Builder} instance
+   */
+  public static Builder newBuilder(Configuration conf,
+                                   StorageContainerManager scm) {
+    return new Builder(conf, scm);
+  }
+
+  /**
+   * Builder for SCMDatanodeHeartbeatDispatcher.
+   */
+  public static class Builder {
+
+    private final SCMDatanodeReportHandlerFactory reportHandlerFactory;
+    private final Map<Class<? extends GeneratedMessage>,
+        SCMDatanodeReportHandler<? extends GeneratedMessage>> report2handler;
+
+    /**
+     * Constructs SCMDatanodeHeartbeatDispatcher.Builder instance.
+     *
+     * @param conf Configuration object to be used.
+     * @param scm StorageContainerManager instance to be used for report
+     *            handler initialization.
+     */
+    private Builder(Configuration conf, StorageContainerManager scm) {
+      this.report2handler = new HashMap<>();
+      this.reportHandlerFactory =
+          new SCMDatanodeReportHandlerFactory(conf, scm);
+    }
+
+    /**
+     * Adds new report handler for the given report.
+     *
+     * @param report Report for which handler has to be added
+     *
+     * @return Builder
+     */
+    public Builder addHandlerFor(Class<? extends GeneratedMessage> report) {
+      report2handler.put(report, reportHandlerFactory.getHandlerFor(report));
+      return this;
+    }
+
+    /**
+     * Associates the given report handler for the given report.
+     *
+     * @param report Report to be associated with
+     * @param handler Handler to be used for the report
+     *
+     * @return Builder
+     */
+    public Builder addHandler(Class<? extends GeneratedMessage> report,
+        SCMDatanodeReportHandler<? extends GeneratedMessage> handler) {
+      report2handler.put(report, handler);
+      return this;
+    }
+
+    /**
+     * Builds and returns {@link SCMDatanodeHeartbeatDispatcher} instance.
+     *
+     * @return SCMDatanodeHeartbeatDispatcher
+     */
+    public SCMDatanodeHeartbeatDispatcher build() {
+      return new SCMDatanodeHeartbeatDispatcher(report2handler);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java
new file mode 100644
index 0000000..fb89b02
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handles Datanode Node Report.
+ */
+public class SCMDatanodeNodeReportHandler extends
+    SCMDatanodeReportHandler<NodeReportProto> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SCMDatanodeNodeReportHandler.class);
+
+  @Override
+  public void processReport(DatanodeDetails datanodeDetails,
+                            NodeReportProto report) throws IOException {
+    LOG.debug("Processing node report from {}.", datanodeDetails);
+    //TODO: add logic to process node report.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java
new file mode 100644
index 0000000..d338649
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+
+import java.io.IOException;
+
+/**
+ * Datanode Report handlers should implement this interface in order to get
+ * call back whenever the report is received from datanode.
+ *
+ * @param <T> Type of report the handler is interested in.
+ */
+public abstract class SCMDatanodeReportHandler<T extends GeneratedMessage>
+    implements Configurable {
+
+  private Configuration config;
+  private StorageContainerManager scm;
+
+  /**
+   * Initializes SCMDatanodeReportHandler and associates it with the given
+   * StorageContainerManager instance.
+   *
+   * @param storageContainerManager StorageContainerManager instance to be
+   *                                associated with.
+   */
+  public void init(StorageContainerManager storageContainerManager) {
+    this.scm = storageContainerManager;
+  }
+
+  /**
+   * Returns the associated StorageContainerManager instance. This will be
+   * used by the ReportHandler implementations.
+   *
+   * @return {@link StorageContainerManager}
+   */
+  protected StorageContainerManager getSCM() {
+    return scm;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.config = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+
+  /**
+   * Processes the report received from datanode. Each ReportHandler
+   * implementation is responsible for providing the logic to process the
+   * report it's interested in.
+   *
+   * @param datanodeDetails Datanode Information
+   * @param report Report to be processed
+   *
+   * @throws IOException In case of any exception
+   */
+  abstract void processReport(DatanodeDetails datanodeDetails, T report)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java
new file mode 100644
index 0000000..e88495f
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Factory class to construct {@link SCMDatanodeReportHandler} given a report.
+ */
+public class SCMDatanodeReportHandlerFactory {
+
+  private final Configuration conf;
+  private final StorageContainerManager scm;
+  private final Map<Class<? extends GeneratedMessage>,
+      Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>>
+      report2handler;
+
+  /**
+   * Constructs {@link SCMDatanodeReportHandler} instance.
+   *
+   * @param conf Configuration to be passed to the
+   *               {@link SCMDatanodeReportHandler}
+   */
+  public SCMDatanodeReportHandlerFactory(Configuration conf,
+                                         StorageContainerManager scm) {
+    this.conf = conf;
+    this.scm = scm;
+    this.report2handler = new HashMap<>();
+
+    report2handler.put(NodeReportProto.class,
+        SCMDatanodeNodeReportHandler.class);
+    report2handler.put(ContainerReportsProto.class,
+        SCMDatanodeContainerReportHandler.class);
+  }
+
+  /**
+   * Returns the SCMDatanodeReportHandler for the corresponding report.
+   *
+   * @param report report
+   *
+   * @return report handler
+   */
+  public SCMDatanodeReportHandler<? extends GeneratedMessage> getHandlerFor(
+      Class<? extends GeneratedMessage> report) {
+    Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>
+        handlerClass = report2handler.get(report);
+    if (handlerClass == null) {
+      throw new RuntimeException("No handler found for report " + report);
+    }
+    SCMDatanodeReportHandler<? extends GeneratedMessage> instance =
+    ReflectionUtils.newInstance(handlerClass, conf);
+    instance.init(scm);
+    return instance;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
new file mode 100644
index 0000000..fda3993
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.server.report;
+/**
+ * Handling of all the datanode reports in SCM which are received through
+ * heartbeat is done here.
+ *
+ * SCM Datanode Report Processing State Diagram:
+ *
+ *          SCMDatanode            SCMDatanodeHeartbeat    SCMDatanodeReport
+ *         ProtocolServer              Dispatcher               Handler
+ *                |                        |                        |
+ *                |                        |                        |
+ *                |    construct           |                        |
+ *                |----------------------->|                        |
+ *                |                        |                        |
+ *                |                        |     register           |
+ *                |                        |<-----------------------|
+ *                |                        |                        |
+ *   +------------+------------------------+------------------------+--------+
+ *   | loop       |                        |                        |        |
+ *   |            |                        |                        |        |
+ *   |            |                        |                        |        |
+ *   | heartbeat  |                        |                        |        |
+ * - +----------->|                        |                        |        |
+ *   |   from     |       heartbeat        |                        |        |
+ *   | Datanode   |----------------------->|                        |        |
+ *   |            |                        |       report           |        |
+ *   |            |                        |----------------------->|        |
+ *   |            |                        |                        |        |
+ *   |   DN       |                        |                        |        |
+ * <-+------------|                        |                        |        |
+ *   | commands   |                        |                        |        |
+ *   |            |                        |                        |        |
+ *   +------------+------------------------+------------------------+--------+
+ *                |                        |                        |
+ *                |                        |                        |
+ *                |     shutdown           |                        |
+ *                |----------------------->|                        |
+ *                |                        |                        |
+ *                |                        |                        |
+ *                -                        -                        -
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java
new file mode 100644
index 0000000..776ae88
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases to verify SCMDatanodeContainerReportHandler's behavior.
+ */
+public class TestSCMDatanodeContainerReportHandler {
+
+  //TODO: add test cases to verify SCMDatanodeContainerReportHandler.
+
+  @Test
+  public void dummyTest() {
+    Assert.assertTrue(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java
new file mode 100644
index 0000000..5d08647
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
+ */
+public class TestSCMDatanodeHeartbeatDispatcher {
+
+  @Test
+  public void testSCMDatanodeHeartbeatDispatcherBuilder() {
+    Configuration conf = new OzoneConfiguration();
+    SCMDatanodeHeartbeatDispatcher dispatcher =
+        SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
+        .addHandlerFor(NodeReportProto.class)
+        .addHandlerFor(ContainerReportsProto.class)
+        .build();
+    Assert.assertNotNull(dispatcher);
+  }
+
+  @Test
+  public void testNodeReportDispatcher() throws IOException {
+    Configuration conf = new OzoneConfiguration();
+    SCMDatanodeNodeReportHandler nodeReportHandler =
+        Mockito.mock(SCMDatanodeNodeReportHandler.class);
+    SCMDatanodeHeartbeatDispatcher dispatcher =
+        SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
+            .addHandler(NodeReportProto.class, nodeReportHandler)
+            .build();
+
+    DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
+    NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
+    SCMHeartbeatRequestProto heartbeat =
+        SCMHeartbeatRequestProto.newBuilder()
+        .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+        .setNodeReport(nodeReport)
+        .build();
+    dispatcher.dispatch(heartbeat);
+    verify(nodeReportHandler,
+        times(1))
+        .processReport(any(DatanodeDetails.class), eq(nodeReport));
+  }
+
+  @Test
+  public void testContainerReportDispatcher() throws IOException {
+    Configuration conf = new OzoneConfiguration();
+    SCMDatanodeContainerReportHandler containerReportHandler =
+        Mockito.mock(SCMDatanodeContainerReportHandler.class);
+    SCMDatanodeHeartbeatDispatcher dispatcher =
+        SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
+            .addHandler(ContainerReportsProto.class, containerReportHandler)
+            .build();
+
+    DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
+    ContainerReportsProto containerReport =
+        ContainerReportsProto.getDefaultInstance();
+    SCMHeartbeatRequestProto heartbeat =
+        SCMHeartbeatRequestProto.newBuilder()
+            .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+            .setContainerReport(containerReport)
+            .build();
+    dispatcher.dispatch(heartbeat);
+    verify(containerReportHandler,
+        times(1))
+        .processReport(any(DatanodeDetails.class),
+            any(ContainerReportsProto.class));
+  }
+
+  @Test
+  public void testNodeAndContainerReportDispatcher() throws IOException {
+    Configuration conf = new OzoneConfiguration();
+    SCMDatanodeNodeReportHandler nodeReportHandler =
+        Mockito.mock(SCMDatanodeNodeReportHandler.class);
+    SCMDatanodeContainerReportHandler containerReportHandler =
+        Mockito.mock(SCMDatanodeContainerReportHandler.class);
+    SCMDatanodeHeartbeatDispatcher dispatcher =
+        SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
+            .addHandler(NodeReportProto.class, nodeReportHandler)
+            .addHandler(ContainerReportsProto.class, containerReportHandler)
+            .build();
+
+    DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
+    NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
+    ContainerReportsProto containerReport =
+        ContainerReportsProto.getDefaultInstance();
+    SCMHeartbeatRequestProto heartbeat =
+        SCMHeartbeatRequestProto.newBuilder()
+            .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+            .setNodeReport(nodeReport)
+            .setContainerReport(containerReport)
+            .build();
+    dispatcher.dispatch(heartbeat);
+    verify(nodeReportHandler,
+        times(1))
+        .processReport(any(DatanodeDetails.class), any(NodeReportProto.class));
+    verify(containerReportHandler,
+        times(1))
+        .processReport(any(DatanodeDetails.class),
+            any(ContainerReportsProto.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java
new file mode 100644
index 0000000..30a753c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases to verify TestSCMDatanodeNodeReportHandler's behavior.
+ */
+public class TestSCMDatanodeNodeReportHandler {
+
+
+  //TODO: add test cases to verify SCMDatanodeNodeReportHandler.
+
+  @Test
+  public void dummyTest() {
+    Assert.assertTrue(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java
new file mode 100644
index 0000000..4b918f7
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases to verify the functionality of SCMDatanodeReportHandlerFactory.
+ */
+public class TestSCMDatanodeReportHandlerFactory {
+
+  @Test
+  public void testNodeReportHandlerConstruction() {
+    Configuration conf = new OzoneConfiguration();
+    SCMDatanodeReportHandlerFactory factory =
+        new SCMDatanodeReportHandlerFactory(conf, null);
+    Assert.assertTrue(factory.getHandlerFor(NodeReportProto.class)
+        instanceof SCMDatanodeNodeReportHandler);
+  }
+
+  @Test
+  public void testContainerReporttHandlerConstruction() {
+    Configuration conf = new OzoneConfiguration();
+    SCMDatanodeReportHandlerFactory factory =
+        new SCMDatanodeReportHandlerFactory(conf, null);
+    Assert.assertTrue(factory.getHandlerFor(ContainerReportsProto.class)
+        instanceof SCMDatanodeContainerReportHandler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
new file mode 100644
index 0000000..4a3f59f
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.report;
+/**
+ * Contains test-cases to test Datanode report handlers in SCM.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ddd09d59/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
index 1dbe760..ecddf8e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.report
+    .SCMDatanodeContainerReportHandler;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -80,7 +82,11 @@ public class TestSCMMetrics {
       DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails();
       ContainerReportsProto request = createContainerReport(numReport, stat);
       String fstDatanodeUuid = fstDatanodeDetails.getUuidString();
-      scmManager.getDatanodeProtocolServer().processContainerReports(
+      SCMDatanodeContainerReportHandler containerReportHandler =
+          new SCMDatanodeContainerReportHandler();
+      containerReportHandler.setConf(conf);
+      containerReportHandler.init(scmManager);
+      containerReportHandler.processReport(
           fstDatanodeDetails, request);
 
       // verify container stat metrics
@@ -105,7 +111,7 @@ public class TestSCMMetrics {
       DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails();
       request = createContainerReport(1, stat);
       String sndDatanodeUuid = sndDatanodeDetails.getUuidString();
-      scmManager.getDatanodeProtocolServer().processContainerReports(
+      containerReportHandler.processReport(
           sndDatanodeDetails, request);
 
       scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -128,11 +134,11 @@ public class TestSCMMetrics {
       // Re-send reports but with different value for validating
       // the aggregation.
       stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
-      scmManager.getDatanodeProtocolServer().processContainerReports(
+      containerReportHandler.processReport(
           fstDatanodeDetails, createContainerReport(1, stat));
 
       stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
-      scmManager.getDatanodeProtocolServer().processContainerReports(
+      containerReportHandler.processReport(
           sndDatanodeDetails, createContainerReport(1, stat));
 
       // the global container metrics value should be updated
@@ -176,8 +182,12 @@ public class TestSCMMetrics {
 
       DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
           .getDatanodeDetails();
+      SCMDatanodeContainerReportHandler containerReportHandler =
+          new SCMDatanodeContainerReportHandler();
+      containerReportHandler.setConf(conf);
+      containerReportHandler.init(scmManager);
       ContainerReportsProto request = createContainerReport(numReport, stat);
-      scmManager.getDatanodeProtocolServer().processContainerReports(
+      containerReportHandler.processReport(
           datanodeDetails, request);
 
       MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to