This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8ae632  HDDS-2068. Make StorageContainerDatanodeProtocolService 
message based
e8ae632 is described below

commit e8ae632d4c4f13788b0c42dbf297c8f7b9d889f3
Author: Márton Elek <e...@apache.org>
AuthorDate: Mon Sep 23 16:40:08 2019 +0200

    HDDS-2068. Make StorageContainerDatanodeProtocolService message based
    
    Signed-off-by: Anu Engineer <aengin...@apache.org>
---
 ...inerDatanodeProtocolClientSideTranslatorPB.java |  60 ++++++-----
 ...inerDatanodeProtocolServerSideTranslatorPB.java | 115 ++++++++++++---------
 .../proto/StorageContainerDatanodeProtocol.proto   |  58 ++++++++---
 .../ozone/container/common/SCMTestUtils.java       |   4 +-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java | 102 ++++++++----------
 .../ozone/insight/BaseInsightSubCommand.java       |   4 +-
 .../scm/ScmProtocolBlockLocationInsight.java       |   6 +-
 ...nsight.java => ScmProtocolDatanodeInsight.java} |  27 ++---
 8 files changed, 207 insertions(+), 169 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index 4e1e27e..9b44666 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest.Builder;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -38,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
@@ -45,6 +49,7 @@ import 
org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.function.Consumer;
 
 /**
  * This class is the client-side translator to translate the requests made on
@@ -97,6 +102,25 @@ public class 
StorageContainerDatanodeProtocolClientSideTranslatorPB
   }
 
   /**
+   * Helper method to wrap the request and send the message.
+   */
+  private SCMDatanodeResponse submitRequest(Type type,
+      Consumer<SCMDatanodeRequest.Builder> builderConsumer) throws IOException 
{
+    final SCMDatanodeResponse response;
+    try {
+      Builder builder = SCMDatanodeRequest.newBuilder()
+          .setCmdType(type);
+      builderConsumer.accept(builder);
+      SCMDatanodeRequest wrapper = builder.build();
+
+      response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+    return response;
+  }
+
+  /**
    * Returns SCM version.
    *
    * @param unused - set to null and unused.
@@ -104,16 +128,11 @@ public class 
StorageContainerDatanodeProtocolClientSideTranslatorPB
    */
   @Override
   public SCMVersionResponseProto getVersion(SCMVersionRequestProto
-      unused) throws IOException {
-    SCMVersionRequestProto request =
-        SCMVersionRequestProto.newBuilder().build();
-    final SCMVersionResponseProto response;
-    try {
-      response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException ex) {
-      throw ProtobufHelper.getRemoteException(ex);
-    }
-    return response;
+      request) throws IOException {
+    return submitRequest(Type.GetVersion,
+        (builder) -> builder
+            .setGetVersionRequest(SCMVersionRequestProto.newBuilder().build()))
+        .getGetVersionResponse();
   }
 
   /**
@@ -126,13 +145,9 @@ public class 
StorageContainerDatanodeProtocolClientSideTranslatorPB
   @Override
   public SCMHeartbeatResponseProto sendHeartbeat(
       SCMHeartbeatRequestProto heartbeat) throws IOException {
-    final SCMHeartbeatResponseProto resp;
-    try {
-      resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    return resp;
+    return submitRequest(Type.SendHeartbeat,
+        (builder) -> builder.setSendHeartbeatRequest(heartbeat))
+        .getSendHeartbeatResponse();
   }
 
   /**
@@ -155,13 +170,8 @@ public class 
StorageContainerDatanodeProtocolClientSideTranslatorPB
     req.setContainerReport(containerReportsRequestProto);
     req.setPipelineReports(pipelineReportsProto);
     req.setNodeReport(nodeReport);
-    final SCMRegisteredResponseProto response;
-    try {
-      response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    return response;
+    return submitRequest(Type.Register,
+        (builder) -> builder.setRegisterRequest(req))
+        .getRegisterResponse();
   }
-
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 8622332..ed704eb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -16,29 +16,24 @@
  */
 package org.apache.hadoop.ozone.protocolPB;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import java.io.IOException;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Status;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 
-import java.io.IOException;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is the server-side translator that forwards requests received on
@@ -48,47 +43,71 @@ import java.io.IOException;
 public class StorageContainerDatanodeProtocolServerSideTranslatorPB
     implements StorageContainerDatanodeProtocolPB {
 
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StorageContainerDatanodeProtocolServerSideTranslatorPB.class);
+
   private final StorageContainerDatanodeProtocol impl;
+  private final OzoneProtocolMessageDispatcher<SCMDatanodeRequest,
+      SCMDatanodeResponse> dispatcher;
 
   public StorageContainerDatanodeProtocolServerSideTranslatorPB(
-      StorageContainerDatanodeProtocol impl) {
+      StorageContainerDatanodeProtocol impl,
+      ProtocolMessageMetrics protocolMessageMetrics) {
     this.impl = impl;
+    dispatcher =
+        new OzoneProtocolMessageDispatcher<>("SCMDatanodeProtocol",
+            protocolMessageMetrics,
+            LOG);
   }
 
-  @Override
-  public SCMVersionResponseProto getVersion(RpcController controller,
-      SCMVersionRequestProto request)
-      throws ServiceException {
-    try {
-      return impl.getVersion(request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+  public SCMRegisteredResponseProto register(
+      SCMRegisterRequestProto request) throws IOException {
+    ContainerReportsProto containerRequestProto = request
+        .getContainerReport();
+    NodeReportProto dnNodeReport = request.getNodeReport();
+    PipelineReportsProto pipelineReport = request.getPipelineReports();
+    return impl.register(request.getDatanodeDetails(), dnNodeReport,
+        containerRequestProto, pipelineReport);
+
   }
 
   @Override
-  public SCMRegisteredResponseProto register(RpcController controller,
-      SCMRegisterRequestProto request) throws ServiceException {
-    try {
-      ContainerReportsProto containerRequestProto = request
-          .getContainerReport();
-      NodeReportProto dnNodeReport = request.getNodeReport();
-      PipelineReportsProto pipelineReport = request.getPipelineReports();
-      return impl.register(request.getDatanodeDetails(), dnNodeReport,
-          containerRequestProto, pipelineReport);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+  public SCMDatanodeResponse submitRequest(RpcController controller,
+      SCMDatanodeRequest request) throws ServiceException {
+    return dispatcher.processRequest(request, this::processMessage,
+        request.getCmdType(), request.getTraceID());
   }
 
-  @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller,
-      SCMHeartbeatRequestProto request) throws ServiceException {
+  public SCMDatanodeResponse processMessage(SCMDatanodeRequest request)
+      throws ServiceException {
     try {
-      return impl.sendHeartbeat(request);
+      Type cmdType = request.getCmdType();
+      switch (cmdType) {
+      case GetVersion:
+        return SCMDatanodeResponse.newBuilder()
+            .setCmdType(cmdType)
+            .setStatus(Status.OK)
+            .setGetVersionResponse(
+                impl.getVersion(request.getGetVersionRequest()))
+            .build();
+      case SendHeartbeat:
+        return SCMDatanodeResponse.newBuilder()
+            .setCmdType(cmdType)
+            .setStatus(Status.OK)
+            .setSendHeartbeatResponse(
+                impl.sendHeartbeat(request.getSendHeartbeatRequest()))
+            .build();
+      case Register:
+        return SCMDatanodeResponse.newBuilder()
+            .setCmdType(cmdType)
+            .setStatus(Status.OK)
+            .setRegisterResponse(register(request.getRegisterRequest()))
+            .build();
+      default:
+        throw new ServiceException("Unknown command type: " + cmdType);
+      }
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
-
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1d09dfa..a975cd5 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -34,6 +34,45 @@ package hadoop.hdds;
 
 import "hdds.proto";
 
+
+message SCMDatanodeRequest {
+  required Type cmdType = 1; // Type of the command
+
+  optional string traceID = 2;
+
+  optional SCMVersionRequestProto getVersionRequest = 3;
+  optional SCMRegisterRequestProto registerRequest = 4;
+  optional SCMHeartbeatRequestProto sendHeartbeatRequest = 5;
+}
+
+message SCMDatanodeResponse {
+  required Type cmdType = 1; // Type of the command
+
+  optional string traceID = 2;
+
+  optional bool success = 3 [default = true];
+
+  optional string message = 4;
+
+  required Status status = 5;
+
+  optional SCMVersionResponseProto getVersionResponse = 6;
+  optional SCMRegisteredResponseProto registerResponse = 7;
+  optional SCMHeartbeatResponseProto sendHeartbeatResponse = 8;
+
+}
+
+enum Type {
+  GetVersion = 1;
+  Register = 2;
+  SendHeartbeat = 3;
+}
+
+enum Status {
+  OK = 1;
+  ERROR = 2;
+}
+
 /**
  * Request for version info of the software stack on the server.
  */
@@ -385,21 +424,6 @@ message ReplicateContainerCommandProto {
  */
 service StorageContainerDatanodeProtocolService {
 
-  /**
-  * Gets the version information from the SCM.
-  */
-  rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto);
-
-  /**
-  * Registers a data node with SCM.
-  */
-  rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto);
-
-  /**
-   * Send heartbeat from datanode to SCM. HB's under SCM looks more
-   * like life line protocol than HB's under HDFS. In other words, it is
-   * extremely light weight and contains no data payload.
-   */
-  rpc sendHeartbeat (SCMHeartbeatRequestProto) returns 
(SCMHeartbeatResponseProto);
-
+  //Message sent from Datanode to SCM as a heartbeat.
+  rpc submitRequest (SCMDatanodeRequest) returns (SCMDatanodeResponse);
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
index 514c822..5a7c30c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
@@ -29,12 +29,14 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.test.GenericTestUtils;
 
 import com.google.protobuf.BlockingService;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import org.mockito.Mockito;
 
 /**
  * Test Endpoint class.
@@ -91,7 +93,7 @@ public final class SCMTestUtils {
         StorageContainerDatanodeProtocolService.
             newReflectiveBlockingService(
                 new StorageContainerDatanodeProtocolServerSideTranslatorPB(
-                    server));
+                    server, Mockito.mock(ProtocolMessageMetrics.class)));
 
     RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
         StorageContainerDatanodeProtocolPB.class, scmDatanodeService,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 6dd9dab..530c0a6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -21,61 +21,32 @@
  */
 package org.apache.hadoop.hdds.scm.server;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.protobuf.BlockingService;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.closeContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.deleteBlocksCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.deleteContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
-    .replicateContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.reregisterCommand;
-
-
-
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .ReportFromDatanode;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-        .PipelineReportFromDatanode;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -95,27 +66,28 @@ import 
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.BlockingService;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closeContainerCommand;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
-
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Protocol Handler for Datanode Protocol.
@@ -138,6 +110,7 @@ public class SCMDatanodeProtocolServer implements
   private final InetSocketAddress datanodeRpcAddress;
   private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
   private final EventPublisher eventPublisher;
+  private final ProtocolMessageMetrics protocolMessageMetrics;
 
   public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
       StorageContainerManager scm, EventPublisher eventPublisher)
@@ -157,12 +130,17 @@ public class SCMDatanodeProtocolServer implements
 
     RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
+
+    protocolMessageMetrics = ProtocolMessageMetrics
+        .create("SCMDatanodeProtocol", "SCM Datanode protocol",
+            StorageContainerDatanodeProtocolProtos.Type.values());
+
     BlockingService dnProtoPbService =
         StorageContainerDatanodeProtocolProtos
             .StorageContainerDatanodeProtocolService
             .newReflectiveBlockingService(
                 new StorageContainerDatanodeProtocolServerSideTranslatorPB(
-                    this));
+                    this, protocolMessageMetrics));
 
     InetSocketAddress datanodeRpcAddr =
         HddsServerUtil.getScmDataNodeBindAddress(conf);
@@ -191,6 +169,7 @@ public class SCMDatanodeProtocolServer implements
     LOG.info(
         StorageContainerManager.buildRpcServerStartMessage(
             "RPC server for DataNodes", datanodeRpcAddress));
+    protocolMessageMetrics.register();
     datanodeRpcServer.start();
   }
 
@@ -370,6 +349,7 @@ public class SCMDatanodeProtocolServer implements
       LOG.error(" datanodeRpcServer stop failed.", ex);
     }
     IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+    protocolMessageMetrics.unregister();
   }
 
   @Override
diff --git 
a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
 
b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
index 95cda41..9a6b010 100644
--- 
a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
+++ 
b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight;
 import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
+import org.apache.hadoop.ozone.insight.scm.ScmProtocolDatanodeInsight;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 
 import picocli.CommandLine;
@@ -88,7 +89,8 @@ public class BaseInsightSubCommand {
     insights.put("scm.event-queue", new EventQueueInsight());
     insights.put("scm.protocol.block-location",
         new ScmProtocolBlockLocationInsight());
-
+    insights.put("scm.protocol.datanode",
+        new ScmProtocolDatanodeInsight());
     insights.put("om.key-manager", new KeyManagerInsight());
     insights.put("om.protocol.client", new OmProtocolInsight());
 
diff --git 
a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
 
b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
index 5ca0945..31c73c0 100644
--- 
a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
+++ 
b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
@@ -23,12 +23,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import 
org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
 import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
-import 
org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 
 /**
  * Insight metric to check the SCM block location protocol behaviour.
@@ -42,9 +42,9 @@ public class ScmProtocolBlockLocationInsight extends 
BaseInsightPoint {
         new LoggerSource(Type.SCM,
             ScmBlockLocationProtocolServerSideTranslatorPB.class,
             defaultLevel(verbose)));
-    new LoggerSource(Type.SCM,
+    loggers.add(new LoggerSource(Type.SCM,
         SCMBlockProtocolServer.class,
-        defaultLevel(verbose));
+        defaultLevel(verbose)));
     return loggers;
   }
 
diff --git 
a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
 
b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java
similarity index 67%
copy from 
hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
copy to 
hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java
index 5ca0945..289af89 100644
--- 
a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
+++ 
b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java
@@ -22,29 +22,30 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
 import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
-import 
org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
 
 /**
- * Insight metric to check the SCM block location protocol behaviour.
+ * Insight metric to check the SCM datanode protocol behaviour.
  */
-public class ScmProtocolBlockLocationInsight extends BaseInsightPoint {
+public class ScmProtocolDatanodeInsight extends BaseInsightPoint {
 
   @Override
   public List<LoggerSource> getRelatedLoggers(boolean verbose) {
     List<LoggerSource> loggers = new ArrayList<>();
     loggers.add(
         new LoggerSource(Type.SCM,
-            ScmBlockLocationProtocolServerSideTranslatorPB.class,
+            SCMDatanodeProtocolServer.class,
+            defaultLevel(verbose)));
+    loggers.add(
+        new LoggerSource(Type.SCM,
+            StorageContainerDatanodeProtocolServerSideTranslatorPB.class,
             defaultLevel(verbose)));
-    new LoggerSource(Type.SCM,
-        SCMBlockProtocolServer.class,
-        defaultLevel(verbose));
     return loggers;
   }
 
@@ -53,19 +54,19 @@ public class ScmProtocolBlockLocationInsight extends 
BaseInsightPoint {
     List<MetricGroupDisplay> metrics = new ArrayList<>();
 
     Map<String, String> filter = new HashMap<>();
-    filter.put("servername", "StorageContainerLocationProtocolService");
+    filter.put("servername", "StorageContainerDatanodeProtocolService");
 
     addRpcMetrics(metrics, Type.SCM, filter);
 
-    addProtocolMessageMetrics(metrics, "scm_block_location_protocol",
-        Type.SCM, ScmBlockLocationProtocolProtos.Type.values());
+    addProtocolMessageMetrics(metrics, "scm_datanode_protocol",
+        Type.SCM, StorageContainerDatanodeProtocolProtos.Type.values());
 
     return metrics;
   }
 
   @Override
   public String getDescription() {
-    return "SCM Block location protocol endpoint";
+    return "SCM Datanode protocol endpoint";
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to