[36/50] [abbrv] hadoop git commit: HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee.

2018-10-23 Thread sunchao
HDDS-676. Enable Read from open Containers via Standalone Protocol.
Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3cca120
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3cca120
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3cca120

Branch: refs/heads/HDFS-12943
Commit: e3cca1204874d37b48095c8ff9a44c1f16dc15ed
Parents: e98a506
Author: Anu Engineer 
Authored: Mon Oct 22 15:31:13 2018 -0700
Committer: Anu Engineer 
Committed: Mon Oct 22 15:57:01 2018 -0700

--
 .../hadoop/hdds/scm/XceiverClientGrpc.java  | 194 +--
 .../hadoop/hdds/scm/XceiverClientManager.java   |  14 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java |   6 +
 .../scm/container/common/helpers/Pipeline.java  |   4 +
 .../ozone/client/io/ChunkGroupInputStream.java  |   9 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   4 +
 .../hadoop/ozone/TestMiniOzoneCluster.java  |   2 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java| 108 +++
 .../ozone/scm/TestXceiverClientManager.java |  15 +-
 9 files changed, 286 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3cca120/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
--
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 2f11872..9526be3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.UUID;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
   private final Pipeline pipeline;
   private final Configuration config;
-  private XceiverClientProtocolServiceStub asyncStub;
+  private Map asyncStubs;
   private XceiverClientMetrics metrics;
-  private ManagedChannel channel;
+  private Map channels;
   private final Semaphore semaphore;
   private boolean closed = false;
 
@@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 this.semaphore =
 new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
 this.metrics = XceiverClientManager.getXceiverClientMetrics();
+this.channels = new HashMap<>();
+this.asyncStubs = new HashMap<>();
   }
 
   @Override
   public void connect() throws Exception {
+
+// leader by default is the 1st datanode in the datanode list of pipleline
 DatanodeDetails leader = this.pipeline.getLeader();
+// just make a connection to the 1st datanode at the beginning
+connectToDatanode(leader);
+  }
 
+  private void connectToDatanode(DatanodeDetails dn) {
 // read port from the data node, on failure use default configured
 // port.
-int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
 if (port == 0) {
   port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
   OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
 }
-LOG.debug("Connecting to server Port : " + leader.getIpAddress());
-channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
-.usePlaintext()
-.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-.build();
-asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+LOG.debug("Connecting to server Port : " + dn.getIpAddress());
+ManagedChannel channel =
+NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
+

hadoop git commit: HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee.

2018-10-23 Thread shashikant
Repository: hadoop
Updated Branches:
  refs/heads/ozone-0.3 575613ef7 -> 3191afa12


HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed 
by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3191afa1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3191afa1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3191afa1

Branch: refs/heads/ozone-0.3
Commit: 3191afa12134cb85ff3af68d50906a352b4b6979
Parents: 575613e
Author: Shashikant Banerjee 
Authored: Tue Oct 23 19:19:58 2018 +0530
Committer: Shashikant Banerjee 
Committed: Tue Oct 23 19:19:58 2018 +0530

--
 .../hadoop/hdds/scm/XceiverClientGrpc.java  | 194 +--
 .../hadoop/hdds/scm/XceiverClientManager.java   |  14 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java |   6 +
 .../scm/container/common/helpers/Pipeline.java  |   4 +
 .../ozone/client/io/ChunkGroupInputStream.java  |   9 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   4 +
 .../hadoop/ozone/TestMiniOzoneCluster.java  |   2 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java| 107 ++
 .../ozone/scm/TestXceiverClientManager.java |  15 +-
 9 files changed, 285 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
--
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 2f11872..9526be3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.UUID;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
   private final Pipeline pipeline;
   private final Configuration config;
-  private XceiverClientProtocolServiceStub asyncStub;
+  private Map asyncStubs;
   private XceiverClientMetrics metrics;
-  private ManagedChannel channel;
+  private Map channels;
   private final Semaphore semaphore;
   private boolean closed = false;
 
@@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 this.semaphore =
 new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
 this.metrics = XceiverClientManager.getXceiverClientMetrics();
+this.channels = new HashMap<>();
+this.asyncStubs = new HashMap<>();
   }
 
   @Override
   public void connect() throws Exception {
+
+// leader by default is the 1st datanode in the datanode list of pipleline
 DatanodeDetails leader = this.pipeline.getLeader();
+// just make a connection to the 1st datanode at the beginning
+connectToDatanode(leader);
+  }
 
+  private void connectToDatanode(DatanodeDetails dn) {
 // read port from the data node, on failure use default configured
 // port.
-int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
 if (port == 0) {
   port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
   OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
 }
-LOG.debug("Connecting to server Port : " + leader.getIpAddress());
-channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
-.usePlaintext()
-.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-.build();
-asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+LOG.debug("Connecting to server Port : " + dn.getIpAddress());
+ManagedChannel channel =
+

hadoop git commit: HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee.

2018-10-22 Thread aengineer
Repository: hadoop
Updated Branches:
  refs/heads/trunk e98a50622 -> e3cca1204


HDDS-676. Enable Read from open Containers via Standalone Protocol.
Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3cca120
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3cca120
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3cca120

Branch: refs/heads/trunk
Commit: e3cca1204874d37b48095c8ff9a44c1f16dc15ed
Parents: e98a506
Author: Anu Engineer 
Authored: Mon Oct 22 15:31:13 2018 -0700
Committer: Anu Engineer 
Committed: Mon Oct 22 15:57:01 2018 -0700

--
 .../hadoop/hdds/scm/XceiverClientGrpc.java  | 194 +--
 .../hadoop/hdds/scm/XceiverClientManager.java   |  14 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java |   6 +
 .../scm/container/common/helpers/Pipeline.java  |   4 +
 .../ozone/client/io/ChunkGroupInputStream.java  |   9 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   4 +
 .../hadoop/ozone/TestMiniOzoneCluster.java  |   2 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java| 108 +++
 .../ozone/scm/TestXceiverClientManager.java |  15 +-
 9 files changed, 286 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3cca120/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
--
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 2f11872..9526be3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.UUID;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
   private final Pipeline pipeline;
   private final Configuration config;
-  private XceiverClientProtocolServiceStub asyncStub;
+  private Map asyncStubs;
   private XceiverClientMetrics metrics;
-  private ManagedChannel channel;
+  private Map channels;
   private final Semaphore semaphore;
   private boolean closed = false;
 
@@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 this.semaphore =
 new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
 this.metrics = XceiverClientManager.getXceiverClientMetrics();
+this.channels = new HashMap<>();
+this.asyncStubs = new HashMap<>();
   }
 
   @Override
   public void connect() throws Exception {
+
+// leader by default is the 1st datanode in the datanode list of pipleline
 DatanodeDetails leader = this.pipeline.getLeader();
+// just make a connection to the 1st datanode at the beginning
+connectToDatanode(leader);
+  }
 
+  private void connectToDatanode(DatanodeDetails dn) {
 // read port from the data node, on failure use default configured
 // port.
-int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
 if (port == 0) {
   port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
   OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
 }
-LOG.debug("Connecting to server Port : " + leader.getIpAddress());
-channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
-.usePlaintext()
-.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-.build();
-asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+LOG.debug("Connecting to server Port : " + dn.getIpAddress());
+ManagedChannel channel =
+