HDFS-12159. Ozone: SCM: Add create replication pipeline RPC. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a245c60b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a245c60b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a245c60b Branch: refs/heads/HDFS-7240 Commit: a245c60bb0eb7e5247d90b67c8947ab552af1e9f Parents: 6c1e9ab Author: Anu Engineer <aengin...@apache.org> Authored: Thu Aug 17 19:38:26 2017 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Thu Aug 17 19:38:26 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/protocol/DatanodeID.java | 23 ++- .../hadoop/hdfs/protocolPB/PBHelperClient.java | 15 -- .../apache/hadoop/ozone/OzoneConfigKeys.java | 16 ++ .../org/apache/hadoop/ozone/OzoneConsts.java | 2 + .../apache/hadoop/scm/XceiverClientManager.java | 33 ++++ .../scm/client/ContainerOperationClient.java | 38 ++-- .../org/apache/hadoop/scm/client/ScmClient.java | 50 ++--- .../StorageContainerLocationProtocol.java | 59 +++--- ...rLocationProtocolClientSideTranslatorPB.java | 72 ++++--- .../main/java/org/apache/ratis/RatisHelper.java | 2 +- .../src/main/proto/Ozone.proto | 20 ++ .../StorageContainerLocationProtocol.proto | 47 ++++- .../src/main/proto/hdfs.proto | 3 +- .../hadoop/cblock/storage/StorageManager.java | 8 +- .../statemachine/DatanodeStateMachine.java | 8 +- .../common/statemachine/StateContext.java | 12 +- .../states/datanode/InitDatanodeState.java | 2 + .../common/transport/server/XceiverServer.java | 11 ++ .../transport/server/XceiverServerSpi.java | 9 + .../server/ratis/XceiverServerRatis.java | 103 +++++++--- .../container/ozoneimpl/OzoneContainer.java | 49 +++-- ...rLocationProtocolServerSideTranslatorPB.java | 39 ++-- .../ozone/scm/StorageContainerManager.java | 89 ++++----- .../ozone/scm/block/BlockManagerImpl.java | 8 +- .../ozone/scm/container/ContainerMapping.java | 115 ++--------- .../hadoop/ozone/scm/container/Mapping.java | 15 +- .../ozone/scm/pipelines/PipelineManager.java | 69 +++++++ .../ozone/scm/pipelines/PipelineSelector.java | 198 +++++++++++++++++++ .../ozone/scm/pipelines/package-info.java | 38 ++++ .../scm/pipelines/ratis/RatisManagerImpl.java | 113 +++++++++++ .../ozone/scm/pipelines/ratis/package-info.java | 18 ++ .../standalone/StandaloneManagerImpl.java | 139 +++++++++++++ .../scm/pipelines/standalone/package-info.java | 18 ++ .../hadoop/ozone/scm/ratis/RatisManager.java | 59 ------ .../ozone/scm/ratis/RatisManagerImpl.java | 194 ------------------ .../src/main/resources/ozone-default.xml | 17 ++ .../apache/hadoop/cblock/TestBufferManager.java | 4 +- .../hadoop/cblock/TestCBlockReadWrite.java | 4 +- .../hadoop/cblock/TestLocalBlockCache.java | 4 +- .../hadoop/cblock/util/MockStorageClient.java | 20 +- .../namenode/TestFavoredNodesEndToEnd.java | 6 +- .../apache/hadoop/ozone/MiniOzoneCluster.java | 30 +-- .../apache/hadoop/ozone/RatisTestHelper.java | 10 +- .../hadoop/ozone/TestContainerOperations.java | 5 +- .../hadoop/ozone/TestMiniOzoneCluster.java | 34 +++- .../ozone/TestStorageContainerManager.java | 16 +- .../common/TestDatanodeStateMachine.java | 11 +- .../ozone/container/common/TestEndPoint.java | 15 +- .../container/ozoneimpl/TestOzoneContainer.java | 4 +- .../ozoneimpl/TestOzoneContainerRatis.java | 67 ++++--- .../container/ozoneimpl/TestRatisManager.java | 20 +- .../placement/TestContainerPlacement.java | 3 +- .../transport/server/TestContainerServer.java | 10 +- .../hadoop/ozone/scm/TestAllocateContainer.java | 19 +- .../ozone/scm/TestContainerSmallFile.java | 13 +- .../org/apache/hadoop/ozone/scm/TestSCMCli.java | 47 +++-- .../ozone/scm/TestXceiverClientManager.java | 24 ++- .../scm/container/TestContainerMapping.java | 28 ++- .../ozone/scm/node/TestContainerPlacement.java | 17 +- .../ozone/web/client/TestBucketsRatis.java | 3 +- .../hadoop/ozone/web/client/TestKeysRatis.java | 2 + .../ozone/web/client/TestVolumeRatis.java | 6 +- 62 files changed, 1369 insertions(+), 764 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index c563079..517e474 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -52,7 +52,8 @@ public class DatanodeID implements Comparable<DatanodeID> { private int infoSecurePort; // info server port private int ipcPort; // IPC server port private String xferAddr; - private int containerPort; // container server port. + private int containerPort; // container Stand_alone Rpc port. + private int ratisPort; // Container Ratis RPC Port. /** * UUID identifying a given datanode. For upgraded Datanodes this is the @@ -78,7 +79,7 @@ public class DatanodeID implements Comparable<DatanodeID> { } /** - * Create a DatanodeID + * Create a DatanodeID. * @param ipAddr IP * @param hostName hostname * @param datanodeUuid data node ID, UUID for new Datanodes, may be the @@ -296,6 +297,22 @@ public class DatanodeID implements Comparable<DatanodeID> { } /** + * Gets the Ratis Port. + * @return retis port. + */ + public int getRatisPort() { + return ratisPort; + } + + /** + * Sets the Ratis Port. + * @param ratisPort - Ratis port. + */ + public void setRatisPort(int ratisPort) { + this.ratisPort = ratisPort; + } + + /** * Returns a DataNode ID from the protocol buffers. * * @param datanodeIDProto - protoBuf Message @@ -308,6 +325,7 @@ public class DatanodeID implements Comparable<DatanodeID> { datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(), datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort()); id.setContainerPort(datanodeIDProto.getContainerPort()); + id.setRatisPort(datanodeIDProto.getRatisPort()); return id; } @@ -326,6 +344,7 @@ public class DatanodeID implements Comparable<DatanodeID> { .setInfoSecurePort(this.getInfoSecurePort()) .setIpcPort(this.getIpcPort()) .setContainerPort(this.getContainerPort()) + .setRatisPort(this.getRatisPort()) .build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 72b703a..a16c679 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -184,8 +184,6 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; -import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ChunkedArrayList; @@ -2489,19 +2487,6 @@ public class PBHelperClient { return result; } - public static ContainerRequestProto.ReplicationFactor - convertReplicationFactor(ScmClient.ReplicationFactor replicationFactor) { - switch (replicationFactor) { - case ONE: - return ContainerRequestProto.ReplicationFactor.ONE; - case THREE: - return ContainerRequestProto.ReplicationFactor.THREE; - default: - throw new IllegalArgumentException("Ozone only supports replicaiton" + - " factor 1 or 3"); - } - } - public static XAttr convertXAttr(XAttrProto a) { XAttr.Builder builder = new XAttr.Builder(); builder.setNameSpace(convert(a.getNamespace())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8a99359..64c7987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -45,6 +45,22 @@ public final class OzoneConfigKeys { public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT = false; + /** + * Ratis Port where containers listen to. + */ + public static final String DFS_CONTAINER_RATIS_IPC_PORT = + "dfs.container.ratis.ipc"; + public static final int DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT = 50012; + + /** + * When set to true, allocate a random free port for ozone container, so that + * a mini cluster is able to launch multiple containers on a node. + */ + public static final String DFS_CONTAINER_RATIS_IPC_RANDOM_PORT = + "dfs.container.ratis.ipc.random.port"; + public static final boolean DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT = + false; + public static final String OZONE_LOCALSTORAGE_ROOT = "ozone.localstorage.root"; public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 5e73045..68f1e09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -123,6 +123,8 @@ public final class OzoneConsts { */ public static final int MAX_LISTVOLUMES_SIZE = 1024; + public static final int INVALID_PORT = -1; + private OzoneConsts() { // Never Constructed } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index b0e4e4e..508c004 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -31,6 +31,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import static org.apache.hadoop.scm.ScmConfigKeys @@ -164,4 +165,36 @@ public class XceiverClientManager implements Closeable { clientCache.invalidateAll(); clientCache.cleanUp(); } + + /** + * Tells us if Ratis is enabled for this cluster. + * @return True if Ratis is enabled. + */ + public boolean isUseRatis() { + return useRatis; + } + + /** + * Returns hard coded 3 as replication factor. + * @return 3 + */ + public OzoneProtos.ReplicationFactor getFactor() { + if(isUseRatis()) { + return OzoneProtos.ReplicationFactor.THREE; + } + return OzoneProtos.ReplicationFactor.ONE; + } + + /** + * Returns the default replication type. + * @return Ratis or Standalone + */ + public OzoneProtos.ReplicationType getType() { + // TODO : Fix me and make Ratis default before release. + if(isUseRatis()) { + return OzoneProtos.ReplicationType.RATIS; + } + return OzoneProtos.ReplicationType.STAND_ALONE; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java index 5ee70bc..a90cff4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java @@ -72,10 +72,7 @@ public class ContainerOperationClient implements ScmClient { } /** - * Create a container with the given ID as its name. - * @param containerId - String container ID - * @return A Pipeline object to actually write/read from the container. - * @throws IOException + * @inheritDoc */ @Override public Pipeline createContainer(String containerId) @@ -83,7 +80,10 @@ public class ContainerOperationClient implements ScmClient { XceiverClientSpi client = null; try { Pipeline pipeline = - storageContainerLocationClient.allocateContainer(containerId); + storageContainerLocationClient.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerId); + client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); ContainerProtocolCalls.createContainer(client, traceID); @@ -101,21 +101,18 @@ public class ContainerOperationClient implements ScmClient { } /** - * Creates a Container on SCM with specified replication factor. - * @param containerId - String container ID - * @param replicationFactor - replication factor - * @return Pipeline - * @throws IOException + * @inheritDoc */ @Override - public Pipeline createContainer(String containerId, - ScmClient.ReplicationFactor replicationFactor) throws IOException { + public Pipeline createContainer(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor factor, + String containerId) throws IOException { XceiverClientSpi client = null; try { // allocate container on SCM. Pipeline pipeline = - storageContainerLocationClient.allocateContainer(containerId, - replicationFactor); + storageContainerLocationClient.allocateContainer(type, factor, + containerId); // connect to pipeline leader and allocate container on leader datanode. client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); @@ -123,7 +120,7 @@ public class ContainerOperationClient implements ScmClient { LOG.info("Created container " + containerId + " leader:" + pipeline.getLeader() + " machines:" + pipeline.getMachines() + - " replication factor:" + replicationFactor.getValue()); + " replication factor:" + factor); return pipeline; } finally { if (client != null) { @@ -150,6 +147,17 @@ public class ContainerOperationClient implements ScmClient { } /** + * Creates a specified replication pipeline. + */ + @Override + public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool) + throws IOException { + return storageContainerLocationClient.createReplicationPipeline(type, + factor, nodePool); + } + + /** * Delete the container, this will release any resource it uses. * @param pipeline - Pipeline that represents the container. * @param force - True to forcibly delete the container. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java index c651a8b..2c2d244 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java @@ -95,41 +95,16 @@ public interface ScmClient { long getContainerSize(Pipeline pipeline) throws IOException; /** - * Replication factors supported by Ozone and SCM. - */ - enum ReplicationFactor{ - ONE(1), - THREE(3); - - private final int value; - ReplicationFactor(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - public static ReplicationFactor parseReplicationFactor(int i) { - switch (i) { - case 1: return ONE; - case 3: return THREE; - default: - throw new IllegalArgumentException("Only replication factor 1 or 3" + - " is supported by Ozone/SCM."); - } - } - } - - /** * Creates a Container on SCM and returns the pipeline. - * @param containerId - String container ID - * @param replicationFactor - replication factor (only 1/3 is supported) + * @param type - Replication Type. + * @param replicationFactor - Replication Factor + * @param containerId - Container ID * @return Pipeline - * @throws IOException + * @throws IOException - in case of error. */ - Pipeline createContainer(String containerId, - ReplicationFactor replicationFactor) throws IOException; + Pipeline createContainer(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor replicationFactor, String containerId) + throws IOException; /** * Returns a set of Nodes that meet a query criteria. @@ -141,4 +116,15 @@ public interface ScmClient { */ OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) throws IOException; + + /** + * Creates a specified replication pipeline. + * @param type - Type + * @param factor - Replication factor + * @param nodePool - Set of machines. + * @throws IOException + */ + Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool) + throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java index 6bb5800..ea0893e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java @@ -1,19 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.hadoop.scm.protocol; @@ -22,7 +21,6 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; -import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; @@ -31,26 +29,14 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; * that currently host a container. */ public interface StorageContainerLocationProtocol { - - /** - * Asks SCM where a container should be allocated. SCM responds with the - * set of datanodes that should be used creating this container. - * @param containerName - Name of the container. - * @return Pipeline. - * @throws IOException - */ - Pipeline allocateContainer(String containerName) throws IOException; - /** * Asks SCM where a container should be allocated. SCM responds with the * set of datanodes that should be used creating this container. - * @param containerName - Name of the container. - * @param replicationFactor - replication factor. - * @return Pipeline. - * @throws IOException + * */ - Pipeline allocateContainer(String containerName, - ScmClient.ReplicationFactor replicationFactor) throws IOException; + Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType, + OzoneProtos.ReplicationFactor factor, + String containerName) throws IOException; /** * Ask SCM the location of the container. SCM responds with a group of @@ -99,4 +85,15 @@ public interface StorageContainerLocationProtocol { OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) throws IOException; + /** + * Creates a replication pipeline of a specified type. + * @param type - replication type + * @param factor - factor 1 or 3 + * @param nodePool - optional machine list to build a pipeline. + * @throws IOException + */ + Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool) + throws IOException; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 7ec4a86..93cd0cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -21,12 +21,10 @@ import com.google.common.base.Strings; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; @@ -37,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; import org.apache.hadoop.scm.container.common.helpers.Pipeline; @@ -74,37 +74,27 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB /** * Asks SCM where a container should be allocated. SCM responds with the set - * of datanodes that should be used creating this container. - * - * @param containerName - Name of the container. - * @return Pipeline. - * @throws IOException - */ - @Override - public Pipeline allocateContainer(String containerName) throws IOException { - return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE); - } - - /** - * Asks SCM where a container should be allocated. SCM responds with the set * of datanodes that should be used creating this container. Ozone/SCM only * supports replication factor of either 1 or 3. - * - * @param containerName - Name of the container. - * @param replicationFactor - replication factor. - * @return Pipeline. + * @param type - Replication Type + * @param factor - Replication Count + * @param containerName - Name + * @return * @throws IOException */ @Override - public Pipeline allocateContainer(String containerName, - ScmClient.ReplicationFactor replicationFactor) throws IOException { + public Pipeline allocateContainer(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor factor, String + containerName) throws IOException { Preconditions.checkNotNull(containerName, "Container Name cannot be Null"); Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" + " be empty"); ContainerRequestProto request = ContainerRequestProto.newBuilder() - .setContainerName(containerName).setReplicationFactor(PBHelperClient - .convertReplicationFactor(replicationFactor)).build(); + .setContainerName(containerName) + .setReplicationFactor(factor) + .setReplicationType(type) + .build(); final ContainerResponseProto response; try { @@ -217,6 +207,42 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } + /** + * Creates a replication pipeline of a specified type. + * + * @param replicationType - replication type + * @param factor - factor 1 or 3 + * @param nodePool - optional machine list to build a pipeline. + * @throws IOException + */ + @Override + public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType + replicationType, OzoneProtos.ReplicationFactor factor, OzoneProtos + .NodePool nodePool) throws IOException { + PipelineRequestProto request = PipelineRequestProto.newBuilder() + .setNodePool(nodePool) + .setReplicationFactor(factor) + .setReplicationType(replicationType) + .build(); + try { + PipelineResponseProto response = + rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request); + if (response.getErrorCode() == + PipelineResponseProto.Error.success) { + Preconditions.checkState(response.hasPipeline(), "With success, " + + "must come a pipeline"); + return Pipeline.getFromProtoBuf(response.getPipeline()); + } else { + String errorMessage = String.format("create replication pipeline " + + "failed. code : %s Message: %s", response.getErrorCode(), + response.hasErrorMessage() ? response.getErrorMessage() : ""); + throw new IOException(errorMessage); + } + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java index bedd9a8..a2acdff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java @@ -41,7 +41,7 @@ public interface RatisHelper { Logger LOG = LoggerFactory.getLogger(RatisHelper.class); static String toRaftPeerIdString(DatanodeID id) { - return id.getIpAddr() + ":" + id.getContainerPort(); + return id.getIpAddr() + ":" + id.getRatisPort(); } static RaftPeerId toRaftPeerId(DatanodeID id) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto index ede6ea9..50926c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto @@ -35,6 +35,7 @@ message Pipeline { required string leaderID = 1; repeated DatanodeIDProto members = 2; required string containerName = 3; + optional LifeCycleStates state = 4 [default = OPERATIONAL]; } message KeyValue { @@ -71,3 +72,22 @@ message NodePool { repeated Node nodes = 1; } + +enum ReplicationType { + RATIS = 1; + STAND_ALONE = 2; + CHAINED = 3; +} + + +enum ReplicationFactor { + ONE = 1; + THREE = 3; +} + +enum LifeCycleStates { + CLIENT_CREATE = 1; // A request to client to create this object + OPERATIONAL = 2; // Mostly an update to SCM via HB or client call. + TIMED_OUT = 3; // creation has timed out from SCM's View. + DELETED = 4; // object is deleted. +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto index 6c16347..30c7166 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto @@ -37,11 +37,9 @@ import "Ozone.proto"; message ContainerRequestProto { required string containerName = 1; // Ozone only support replciation of either 1 or 3. - enum ReplicationFactor { - ONE = 1; - THREE = 3; - } - required ReplicationFactor replicationFactor = 2; + required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2; + required hadoop.hdfs.ozone.ReplicationType replicationType = 3; + } /** @@ -111,6 +109,28 @@ message NodeQueryResponseProto { required hadoop.hdfs.ozone.NodePool datanodes = 1; } +/** + Request to create a replication pipeline. + */ +message PipelineRequestProto { + required hadoop.hdfs.ozone.ReplicationType replicationType = 1; + required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2; + + // if datanodes are specified then pipelines are created using those + // datanodes. + optional hadoop.hdfs.ozone.NodePool nodePool = 3; + optional string pipelineID = 4; +} + +message PipelineResponseProto { + enum Error { + success = 1; + errorPipelineAlreadyExists = 2; + } + required Error errorCode = 1; + optional hadoop.hdfs.ozone.Pipeline pipeline = 2; + optional string errorMessage = 3; +} /** * Protocol used from an HDFS node to StorageContainerManager. See the request @@ -139,4 +159,21 @@ service StorageContainerLocationProtocolService { * Returns a set of Nodes that meet a criteria. */ rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); + + /* + * Apis that Manage Pipelines. + * + * Pipelines are abstractions offered by SCM and Datanode that allows users + * to create a replication pipeline. + * + * These following APIs allow command line programs like SCM CLI to list + * and manage pipelines. + */ + + /** + * Creates a replication pipeline. + */ + rpc allocatePipeline(PipelineRequestProto) + returns (PipelineResponseProto); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 06d6802..497d734 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -58,7 +58,8 @@ message DatanodeIDProto { required uint32 infoPort = 5; // datanode http port required uint32 ipcPort = 6; // ipc server port optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port - optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol + optional uint32 containerPort = 8 [default = 0]; // Ozone stand_alone protocol + optional uint32 ratisPort = 9 [default = 0]; //Ozone ratis port } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java index 368c250..1f22aa8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.cblock.meta.VolumeInfo; import org.apache.hadoop.cblock.proto.MountVolumeResponse; import org.apache.hadoop.cblock.util.KeyUtil; import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.slf4j.Logger; @@ -179,9 +180,10 @@ public class StorageManager { long allocatedSize = 0; ArrayList<String> containerIds = new ArrayList<>(); while (allocatedSize < volumeSize) { - Pipeline pipeline = storageClient.createContainer( - KeyUtil.getContainerName(userName, volumeName, containerIdx), - ScmClient.ReplicationFactor.ONE); + Pipeline pipeline = storageClient.createContainer(OzoneProtos + .ReplicationType.STAND_ALONE, + OzoneProtos.ReplicationFactor.ONE, + KeyUtil.getContainerName(userName, volumeName, containerIdx)); ContainerDescriptor container = new ContainerDescriptor(pipeline.getContainerName()); container.setPipeline(pipeline); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 5e0a656..66992af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler; @@ -72,7 +73,7 @@ public class DatanodeStateMachine implements Closeable { context = new StateContext(this.conf, DatanodeStates.getInitState(), this); heartbeatFrequency = TimeUnit.SECONDS.toMillis( OzoneClientUtils.getScmHeartbeatInterval(conf)); - container = new OzoneContainer(conf); + container = new OzoneContainer(datanodeID, new OzoneConfiguration(conf)); this.datanodeID = datanodeID; nextHB = new AtomicLong(Time.monotonicNow()); @@ -87,11 +88,6 @@ public class DatanodeStateMachine implements Closeable { .build(); } - public DatanodeStateMachine(Configuration conf) - throws IOException { - this(null, conf); - } - public void setDatanodeID(DatanodeID datanodeID) { this.datanodeID = datanodeID; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index eb2ca3a..994b245 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; import static org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ReportState.states .noContainerReports; @@ -90,7 +91,16 @@ public class StateContext { */ public int getContainerPort() { return parent == null ? - -1 : parent.getContainer().getContainerServerPort(); + INVALID_PORT : parent.getContainer().getContainerServerPort(); + } + + /** + * Gets the Ratis Port. + * @return int , return -1 if not valid. + */ + public int getRatisPort() { + return parent == null ? + INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java index d35f64d..0552a2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java @@ -113,9 +113,11 @@ public class InitDatanodeState implements DatanodeState, } File idPath = new File(dataNodeIDPath); int containerPort = this.context.getContainerPort(); + int ratisPort = this.context.getRatisPort(); DatanodeID datanodeID = this.context.getParent().getDatanodeID(); if (datanodeID != null) { datanodeID.setContainerPort(containerPort); + datanodeID.setRatisPort(ratisPort); ContainerUtils.writeDatanodeIDTo(datanodeID, idPath); LOG.info("Datanode ID is persisted to {}", dataNodeIDPath); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 3a6e672..7271cb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -29,6 +29,7 @@ import io.netty.handler.logging.LoggingHandler; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +82,16 @@ public final class XceiverServer implements XceiverServerSpi { return this.port; } + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + @Override + public OzoneProtos.ReplicationType getServerType() { + return OzoneProtos.ReplicationType.STAND_ALONE; + } + @Override public void start() throws IOException { bossGroup = new NioEventLoopGroup(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index e5b0497..b61d7fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.transport.server; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; + import java.io.IOException; /** A server endpoint that acts as the communication layer for Ozone @@ -31,4 +33,11 @@ public interface XceiverServerSpi { /** Get server IPC port. */ int getIPCPort(); + + /** + * Returns the Replication type supported by this end-point. + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + OzoneProtos.ReplicationType getServerType(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 69f3801..ba613c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -18,10 +18,15 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; + +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; @@ -34,7 +39,9 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.net.ServerSocket; import java.util.Collections; import java.util.Objects; @@ -44,6 +51,22 @@ import java.util.Objects; */ public final class XceiverServerRatis implements XceiverServerSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + private final int port; + private final RaftServer server; + + private XceiverServerRatis( + String id, int port, String storageDir, + ContainerDispatcher dispatcher, RpcType rpcType) throws IOException { + Objects.requireNonNull(id, "id == null"); + this.port = port; + + this.server = RaftServer.newBuilder() + .setServerId(RaftPeerId.valueOf(id)) + .setPeers(Collections.emptyList()) + .setProperties(newRaftProperties(rpcType, port, storageDir)) + .setStateMachine(new ContainerStateMachine(dispatcher)) + .build(); + } static RaftProperties newRaftProperties( RpcType rpc, int port, String storageDir) { @@ -52,44 +75,60 @@ public final class XceiverServerRatis implements XceiverServerSpi { RaftConfigKeys.Rpc.setType(properties, rpc); if (rpc == SupportedRpcType.GRPC) { GrpcConfigKeys.Server.setPort(properties, port); - } else if (rpc == SupportedRpcType.NETTY) { - NettyConfigKeys.Server.setPort(properties, port); + } else { + if (rpc == SupportedRpcType.NETTY) { + NettyConfigKeys.Server.setPort(properties, port); + } } return properties; } - public static XceiverServerRatis newXceiverServerRatis( + public static XceiverServerRatis newXceiverServerRatis(String datanodeID, Configuration ozoneConf, ContainerDispatcher dispatcher) throws IOException { - final String id = ozoneConf.get( - OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID); - final int port = ozoneConf.getInt( - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - final String storageDir = ozoneConf.get( + final String ratisDir = File.separator + "ratis"; + int localPort = ozoneConf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); + String storageDir = ozoneConf.get( OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); + + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = ozoneConf.get(OzoneConfigKeys + .OZONE_CONTAINER_METADATA_DIRS); + Preconditions.checkNotNull(storageDir, "ozone.container.metadata.dirs " + + "cannot be null, Please check your configs."); + storageDir = storageDir.concat(ratisDir); + LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " + + "storage under {}. It is a good idea to map this to an SSD disk.", + storageDir); + } final String rpcType = ozoneConf.get( OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); - return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc); - } - private final int port; - private final RaftServer server; - - private XceiverServerRatis( - String id, int port, String storageDir, - ContainerDispatcher dispatcher, RpcType rpcType) throws IOException { - Objects.requireNonNull(id, "id == null"); - this.port = port; - - this.server = RaftServer.newBuilder() - .setServerId(RaftPeerId.valueOf(id)) - .setPeers(Collections.emptyList()) - .setProperties(newRaftProperties(rpcType, port, storageDir)) - .setStateMachine(new ContainerStateMachine(dispatcher)) - .build(); + // Get an available port on current node and + // use that as the container port + if (ozoneConf.getBoolean(OzoneConfigKeys + .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + localPort = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", localPort); + // If we have random local ports configured this means that it + // probably running under MiniOzoneCluster. Ratis locks the storage + // directories, so we need to pass different local directory for each + // local instance. So we map ratis directories under datanode ID. + storageDir = storageDir.concat(File.separator + datanodeID); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", localPort, e); + } + } + return new XceiverServerRatis(datanodeID, localPort, storageDir, + dispatcher, rpc); } @Override @@ -112,4 +151,14 @@ public final class XceiverServerRatis implements XceiverServerSpi { public int getIPCPort() { return port; } + + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + @Override + public OzoneProtos.ReplicationType getServerType() { + return OzoneProtos.ReplicationType.RATIS; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 792c132..307f59f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; @@ -33,6 +34,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; + +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -50,6 +53,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS; import static org.apache.hadoop.ozone.OzoneConfigKeys .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; /** * Ozone main class sets up the network server and initializes the container @@ -62,7 +66,7 @@ public class OzoneContainer { private final Configuration ozoneConfig; private final ContainerDispatcher dispatcher; private final ContainerManager manager; - private final XceiverServerSpi server; + private final XceiverServerSpi[] server; private final ChunkManager chunkManager; private final KeyManager keyManager; private final BlockDeletingService blockDeletingService; @@ -73,8 +77,8 @@ public class OzoneContainer { * @param ozoneConfig - Config * @throws IOException */ - public OzoneContainer( - Configuration ozoneConfig) throws IOException { + public OzoneContainer(DatanodeID datanodeID, Configuration ozoneConfig) throws + IOException { this.ozoneConfig = ozoneConfig; List<StorageLocation> locations = new LinkedList<>(); String[] paths = ozoneConfig.getStrings( @@ -104,12 +108,11 @@ public class OzoneContainer { this.dispatcher = new Dispatcher(manager, this.ozoneConfig); - final boolean useRatis = ozoneConfig.getBoolean( - OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); - server = useRatis? - XceiverServerRatis.newXceiverServerRatis(ozoneConfig, dispatcher) - : new XceiverServer(this.ozoneConfig, this.dispatcher); + server = new XceiverServerSpi[]{ + new XceiverServer(this.ozoneConfig, this.dispatcher), + XceiverServerRatis.newXceiverServerRatis(datanodeID + .getDatanodeUuid().toString(), ozoneConfig, dispatcher) + }; } /** @@ -118,7 +121,9 @@ public class OzoneContainer { * @throws IOException */ public void start() throws IOException { - server.start(); + for (XceiverServerSpi serverinstance : server) { + serverinstance.start(); + } blockDeletingService.start(); dispatcher.init(); } @@ -157,7 +162,9 @@ public class OzoneContainer { */ public void stop() { LOG.info("Attempting to stop container services."); - server.stop(); + for(XceiverServerSpi serverinstance: server) { + serverinstance.stop(); + } dispatcher.shutdown(); try { @@ -194,13 +201,31 @@ public class OzoneContainer { return this.manager.getNodeReport(); } + private int getPortbyType(OzoneProtos.ReplicationType replicationType) { + for (XceiverServerSpi serverinstance : server) { + if (serverinstance.getServerType() == replicationType) { + return serverinstance.getIPCPort(); + } + } + return INVALID_PORT; + } + /** * Returns the container server IPC port. * * @return Container server IPC port. */ public int getContainerServerPort() { - return server.getIPCPort(); + return getPortbyType(OzoneProtos.ReplicationType.STAND_ALONE); + } + + /** + * Returns the Ratis container Server IPC port. + * + * @return Ratis port. + */ + public int getRatisContainerServerPort() { + return getPortbyType(OzoneProtos.ReplicationType.RATIS); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index e45afb9..628de42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -30,22 +31,17 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; -import static org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.ContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.ContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.GetContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.GetContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.DeleteContainerRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.DeleteContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.ListContainerResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos.ListContainerRequestProto; +import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; + import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; @@ -74,7 +70,8 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB public ContainerResponseProto allocateContainer(RpcController unused, ContainerRequestProto request) throws ServiceException { try { - Pipeline pipeline = impl.allocateContainer(request.getContainerName()); + Pipeline pipeline = impl.allocateContainer(request.getReplicationType(), + request.getReplicationFactor(), request.getContainerName()); return ContainerResponseProto.newBuilder() .setPipeline(pipeline.getProtobufMessage()) .setErrorCode(ContainerResponseProto.Error.success) @@ -161,4 +158,12 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB throw new ServiceException(e); } } + + @Override + public PipelineResponseProto allocatePipeline( + RpcController controller, PipelineRequestProto request) + throws ServiceException { + // TODO : Wiring this up requires one more patch. + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index e8cc8f0..e320983 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -39,41 +39,24 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ReportState; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SendContainerReportProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerLocationProtocolProtos; -import org.apache.hadoop.ozone.protocolPB - .ScmBlockLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; +import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerDatanodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.scm.block.BlockManager; import org.apache.hadoop.ozone.scm.block.BlockManagerImpl; import org.apache.hadoop.ozone.scm.container.ContainerMapping; @@ -81,7 +64,6 @@ import org.apache.hadoop.ozone.scm.container.Mapping; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.SCMNodeManager; -import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.scm.container.common.helpers.Pipeline; @@ -386,21 +368,6 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl } /** - * Asks SCM where a container should be allocated. SCM responds with the set - * of datanodes that should be used creating this container. - * - * @param containerName - Name of the container. - * @return Pipeline. - * @throws IOException - */ - @Override - public Pipeline allocateContainer(String containerName) throws IOException { - checkAdminAccess(); - return scmContainerManager.allocateContainer(containerName, - ScmClient.ReplicationFactor.ONE); - } - - /** * {@inheritDoc} */ @Override @@ -458,6 +425,19 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl } /** + * Creates a replication pipeline of a specified type. + */ + @Override + public Pipeline createReplicationPipeline( + OzoneProtos.ReplicationType replicationType, + OzoneProtos.ReplicationFactor factor, + OzoneProtos.NodePool nodePool) + throws IOException { + // TODO: will be addressed in future patch. + return null; + } + + /** * Queries a list of Node that match a set of statuses. * <p> * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, @@ -527,11 +507,12 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl * @throws IOException */ @Override - public Pipeline allocateContainer(String containerName, - ScmClient.ReplicationFactor replicationFactor) throws IOException { + public Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType, + OzoneProtos.ReplicationFactor replicationFactor, String containerName) + throws IOException { checkAdminAccess(); - return scmContainerManager.allocateContainer(containerName, - replicationFactor); + return scmContainerManager.allocateContainer(replicationType, + replicationFactor, containerName); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index 0eb60e4..e000ccc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.scm.container.Mapping; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -177,7 +178,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { for (int i = 0; i < count; i++) { String containerName = UUID.randomUUID().toString(); try { - Pipeline pipeline = containerManager.allocateContainer(containerName); + // TODO: Fix this later when Ratis is made the Default. + Pipeline pipeline = containerManager.allocateContainer( + OzoneProtos.ReplicationType.STAND_ALONE, + OzoneProtos.ReplicationFactor.ONE, + containerName); + if (pipeline == null) { LOG.warn("Unable to allocate container."); continue; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 643779d..8daa5d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -1,4 +1,3 @@ - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this @@ -22,15 +21,11 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; @@ -41,8 +36,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -65,8 +58,7 @@ public class ContainerMapping implements Mapping { private final Lock lock; private final Charset encoding = Charset.forName("UTF-8"); private final MetadataStore containerStore; - private final ContainerPlacementPolicy placementPolicy; - private final long containerSize; + private final PipelineSelector pipelineSelector; /** * Constructs a mapping class that creates mapping between container names and @@ -96,66 +88,10 @@ public class ContainerMapping implements Mapping { .build(); this.lock = new ReentrantLock(); - - this.containerSize = OzoneConsts.GB * conf.getInt( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); - this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); + this.pipelineSelector = new PipelineSelector(nodeManager, conf); } - /** - * Create pluggable container placement policy implementation instance. - * - * @param nodeManager - SCM node manager. - * @param conf - configuration. - * @return SCM container placement policy implementation instance. - */ - @SuppressWarnings("unchecked") - private static ContainerPlacementPolicy createContainerPlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { - Class<? extends ContainerPlacementPolicy> implClass = - (Class<? extends ContainerPlacementPolicy>) conf.getClass( - ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementRandom.class); - try { - Constructor<? extends ContainerPlacementPolicy> ctor = - implClass.getDeclaredConstructor(NodeManager.class, - Configuration.class); - return ctor.newInstance(nodeManager, conf); - } catch (RuntimeException e) { - throw e; - } catch (InvocationTargetException e) { - throw new RuntimeException(implClass.getName() - + " could not be constructed.", e.getCause()); - } catch (Exception e) { - LOG.error("Unhandled exception occured, Placement policy will not be " + - "functional."); - throw new IllegalArgumentException("Unable to load " + - "ContainerPlacementPolicy", e); - } - } - - /** - * Translates a list of nodes, ordered such that the first is the leader, into - * a corresponding {@link Pipeline} object. - * @param nodes - list of datanodes on which we will allocate the container. - * The first of the list will be the leader node. - * @param containerName container name - * @return pipeline corresponding to nodes - */ - private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes, - final String containerName) { - Preconditions.checkNotNull(nodes); - Preconditions.checkArgument(nodes.size() > 0); - String leaderId = nodes.get(0).getDatanodeUuid(); - Pipeline pipeline = new Pipeline(leaderId); - for (DatanodeID node : nodes) { - pipeline.addMember(node); - } - pipeline.setContainerName(containerName); - return pipeline; - } /** * Returns the Pipeline from the container name. @@ -192,7 +128,7 @@ public class ContainerMapping implements Mapping { List<Pipeline> pipelineList = new ArrayList<>(); lock.lock(); try { - if(containerStore.isEmpty()) { + if (containerStore.isEmpty()) { throw new IOException("No container exists in current db"); } MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName); @@ -217,26 +153,14 @@ public class ContainerMapping implements Mapping { * Allocates a new container. * * @param containerName - Name of the container. - * @return - Pipeline that makes up this container. - * @throws IOException - */ - @Override - public Pipeline allocateContainer(final String containerName) - throws IOException { - return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE); - } - - /** - * Allocates a new container. - * - * @param containerName - Name of the container. * @param replicationFactor - replication factor of the container. * @return - Pipeline that makes up this container. - * @throws IOException + * @throws IOException - Exception */ @Override - public Pipeline allocateContainer(final String containerName, - final ScmClient.ReplicationFactor replicationFactor) throws IOException { + public Pipeline allocateContainer(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor replicationFactor, + final String containerName) throws IOException { Preconditions.checkNotNull(containerName); Preconditions.checkState(!containerName.isEmpty()); Pipeline pipeline = null; @@ -253,18 +177,10 @@ public class ContainerMapping implements Mapping { throw new SCMException("Specified container already exists. key : " + containerName, SCMException.ResultCodes.CONTAINER_EXISTS); } - List<DatanodeID> datanodes = placementPolicy.chooseDatanodes( - replicationFactor.getValue(), containerSize); - // TODO: handle under replicated container - if (datanodes != null && datanodes.size() > 0) { - pipeline = newPipelineFromNodes(datanodes, containerName); - containerStore.put(containerName.getBytes(encoding), - pipeline.getProtobufMessage().toByteArray()); - } else { - LOG.debug("Unable to find enough datanodes for new container. " + - "Required {} found {}", replicationFactor, - datanodes != null ? datanodes.size(): 0); - } + pipeline = pipelineSelector.getReplicationPipeline(type, + replicationFactor, containerName); + containerStore.put(containerName.getBytes(encoding), + pipeline.getProtobufMessage().toByteArray()); } finally { lock.unlock(); } @@ -275,9 +191,8 @@ public class ContainerMapping implements Mapping { * Deletes a container from SCM. * * @param containerName - Container name - * @throws IOException - * if container doesn't exist - * or container store failed to delete the specified key. + * @throws IOException if container doesn't exist or container store failed to + * delete the specified key. */ @Override public void deleteContainer(String containerName) throws IOException { @@ -286,7 +201,7 @@ public class ContainerMapping implements Mapping { byte[] dbKey = containerName.getBytes(encoding); byte[] pipelineBytes = containerStore.get(dbKey); - if(pipelineBytes == null) { + if (pipelineBytes == null) { throw new SCMException("Failed to delete container " + containerName + ", reason : container doesn't exist.", SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java index 194158b..1ef3572 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.scm.container; -import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.Closeable; @@ -57,14 +57,6 @@ public interface Mapping extends Closeable { List<Pipeline> listContainer(String startName, String prefixName, int count) throws IOException; - /** - * Allocates a new container for a given keyName. - * - * @param containerName - Name - * @return - Pipeline that makes up this container. - * @throws IOException - */ - Pipeline allocateContainer(String containerName) throws IOException; /** * Allocates a new container for a given keyName and replication factor. @@ -74,8 +66,9 @@ public interface Mapping extends Closeable { * @return - Pipeline that makes up this container. * @throws IOException */ - Pipeline allocateContainer(String containerName, - ScmClient.ReplicationFactor replicationFactor) throws IOException; + Pipeline allocateContainer(OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor replicationFactor, + String containerName) throws IOException; /** * Deletes a container from SCM. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java new file mode 100644 index 0000000..6293d84 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.pipelines; + + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.IOException; +import java.util.List; + +/** + * Manage Ozone pipelines. + */ +public interface PipelineManager { + + /** + * This function is called by the Container Manager while allocating a new + * container. The client specifies what kind of replication pipeline is + * needed and based on the replication type in the request appropriate + * Interface is invoked. + * + * @param containerName Name of the container + * @param replicationFactor - Replication Factor + * @return a Pipeline. + */ + Pipeline getPipeline(String containerName, + OzoneProtos.ReplicationFactor replicationFactor) throws IOException; + + /** + * Creates a pipeline from a specified set of Nodes. + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. + */ + void createPipeline(String pipelineID, List<DatanodeID> datanodes) + throws IOException;; + + /** + * Close the pipeline with the given clusterId. + */ + void closePipeline(String pipelineID) throws IOException; + + /** + * list members in the pipeline . + * @return the datanode + */ + List<DatanodeID> getMembers(String pipelineID) throws IOException; + + /** + * Update the datanode list of the pipeline. + */ + void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes) + throws IOException; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org