[36/50] [abbrv] hadoop git commit: HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee.
HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3cca120 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3cca120 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3cca120 Branch: refs/heads/HDFS-12943 Commit: e3cca1204874d37b48095c8ff9a44c1f16dc15ed Parents: e98a506 Author: Anu Engineer Authored: Mon Oct 22 15:31:13 2018 -0700 Committer: Anu Engineer Committed: Mon Oct 22 15:57:01 2018 -0700 -- .../hadoop/hdds/scm/XceiverClientGrpc.java | 194 +-- .../hadoop/hdds/scm/XceiverClientManager.java | 14 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 6 + .../scm/container/common/helpers/Pipeline.java | 4 + .../ozone/client/io/ChunkGroupInputStream.java | 9 +- .../ozone/client/io/ChunkGroupOutputStream.java | 4 + .../hadoop/ozone/TestMiniOzoneCluster.java | 2 +- .../ozone/client/rpc/TestOzoneRpcClient.java| 108 +++ .../ozone/scm/TestXceiverClientManager.java | 15 +- 9 files changed, 286 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3cca120/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java -- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2f11872..9526be3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.UUID; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final Configuration config; - private XceiverClientProtocolServiceStub asyncStub; + private Map asyncStubs; private XceiverClientMetrics metrics; - private ManagedChannel channel; + private Map channels; private final Semaphore semaphore; private boolean closed = false; @@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi { this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); +this.channels = new HashMap<>(); +this.asyncStubs = new HashMap<>(); } @Override public void connect() throws Exception { + +// leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails leader = this.pipeline.getLeader(); +// just make a connection to the 1st datanode at the beginning +connectToDatanode(leader); + } + private void connectToDatanode(DatanodeDetails dn) { // read port from the data node, on failure use default configured // port. -int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); +int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); if (port == 0) { port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } -LOG.debug("Connecting to server Port : " + leader.getIpAddress()); -channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) -.usePlaintext() -.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) -.build(); -asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); +LOG.debug("Connecting to server Port : " + dn.getIpAddress()); +ManagedChannel channel = +NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() +
hadoop git commit: HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee.
Repository: hadoop Updated Branches: refs/heads/ozone-0.3 575613ef7 -> 3191afa12 HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3191afa1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3191afa1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3191afa1 Branch: refs/heads/ozone-0.3 Commit: 3191afa12134cb85ff3af68d50906a352b4b6979 Parents: 575613e Author: Shashikant Banerjee Authored: Tue Oct 23 19:19:58 2018 +0530 Committer: Shashikant Banerjee Committed: Tue Oct 23 19:19:58 2018 +0530 -- .../hadoop/hdds/scm/XceiverClientGrpc.java | 194 +-- .../hadoop/hdds/scm/XceiverClientManager.java | 14 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 6 + .../scm/container/common/helpers/Pipeline.java | 4 + .../ozone/client/io/ChunkGroupInputStream.java | 9 +- .../ozone/client/io/ChunkGroupOutputStream.java | 4 + .../hadoop/ozone/TestMiniOzoneCluster.java | 2 +- .../ozone/client/rpc/TestOzoneRpcClient.java| 107 ++ .../ozone/scm/TestXceiverClientManager.java | 15 +- 9 files changed, 285 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java -- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2f11872..9526be3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.UUID; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final Configuration config; - private XceiverClientProtocolServiceStub asyncStub; + private Map asyncStubs; private XceiverClientMetrics metrics; - private ManagedChannel channel; + private Map channels; private final Semaphore semaphore; private boolean closed = false; @@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi { this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); +this.channels = new HashMap<>(); +this.asyncStubs = new HashMap<>(); } @Override public void connect() throws Exception { + +// leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails leader = this.pipeline.getLeader(); +// just make a connection to the 1st datanode at the beginning +connectToDatanode(leader); + } + private void connectToDatanode(DatanodeDetails dn) { // read port from the data node, on failure use default configured // port. -int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); +int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); if (port == 0) { port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } -LOG.debug("Connecting to server Port : " + leader.getIpAddress()); -channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) -.usePlaintext() -.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) -.build(); -asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); +LOG.debug("Connecting to server Port : " + dn.getIpAddress()); +ManagedChannel channel = +
hadoop git commit: HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee.
Repository: hadoop Updated Branches: refs/heads/trunk e98a50622 -> e3cca1204 HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3cca120 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3cca120 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3cca120 Branch: refs/heads/trunk Commit: e3cca1204874d37b48095c8ff9a44c1f16dc15ed Parents: e98a506 Author: Anu Engineer Authored: Mon Oct 22 15:31:13 2018 -0700 Committer: Anu Engineer Committed: Mon Oct 22 15:57:01 2018 -0700 -- .../hadoop/hdds/scm/XceiverClientGrpc.java | 194 +-- .../hadoop/hdds/scm/XceiverClientManager.java | 14 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 6 + .../scm/container/common/helpers/Pipeline.java | 4 + .../ozone/client/io/ChunkGroupInputStream.java | 9 +- .../ozone/client/io/ChunkGroupOutputStream.java | 4 + .../hadoop/ozone/TestMiniOzoneCluster.java | 2 +- .../ozone/client/rpc/TestOzoneRpcClient.java| 108 +++ .../ozone/scm/TestXceiverClientManager.java | 15 +- 9 files changed, 286 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3cca120/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java -- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2f11872..9526be3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.UUID; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final Configuration config; - private XceiverClientProtocolServiceStub asyncStub; + private Map asyncStubs; private XceiverClientMetrics metrics; - private ManagedChannel channel; + private Map channels; private final Semaphore semaphore; private boolean closed = false; @@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi { this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); +this.channels = new HashMap<>(); +this.asyncStubs = new HashMap<>(); } @Override public void connect() throws Exception { + +// leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails leader = this.pipeline.getLeader(); +// just make a connection to the 1st datanode at the beginning +connectToDatanode(leader); + } + private void connectToDatanode(DatanodeDetails dn) { // read port from the data node, on failure use default configured // port. -int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); +int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); if (port == 0) { port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } -LOG.debug("Connecting to server Port : " + leader.getIpAddress()); -channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) -.usePlaintext() -.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) -.build(); -asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); +LOG.debug("Connecting to server Port : " + dn.getIpAddress()); +ManagedChannel channel = +