This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1564 by this push:
new 1938862 HDDS-1576. Support configure more than one raft log storage
to host m… (#117)
1938862 is described below
commit 193886202da7cd278350b691fd3107defdbdc08c
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Dec 19 12:23:16 2019 +0800
HDDS-1576. Support configure more than one raft log storage to host m…
(#117)
---
.../transport/server/ratis/XceiverServerRatis.java | 19 +++++
.../TestRatisPipelineCreateAndDestroy.java | 81 +++++++++++++++++++++-
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 27 +++++++-
3 files changed, 122 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index a76944b..706a8e3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -82,6 +82,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT;
+
/**
* Creates a ratis server endpoint that acts as the communication layer for
* Ozone containers.
@@ -213,6 +218,20 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
RaftServerConfigKeys.setStorageDirs(properties,
Collections.singletonList(new File(storageDir)));
+ // Check raft storage dir number and max allowed pipeline number
+ String[] dirs = storageDir.split(",");
+ int maxPipelinePerNode =
conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
+ if (maxPipelinePerNode == 0 ||
+ (maxPipelinePerNode > 2 && dirs.length < (maxPipelinePerNode - 1))) {
+ LOG.warn("{} = {} is smaller than {} = {}. Suggest increase {} or " +
+ "lower {} ",
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+ dirs.length,
+ OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, maxPipelinePerNode,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+ OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT);
+ }
+
// For grpc set the maximum message size
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 0874f8b..c76068f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -24,7 +24,14 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server
+ .ratis.XceiverServerRatis;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -32,8 +39,11 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
@@ -47,15 +57,16 @@ public class TestRatisPipelineCreateAndDestroy {
private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
private static PipelineManager pipelineManager;
+ private static int maxPipelinePerNode = 4;
public void init(int numDatanodes) throws Exception {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
GenericTestUtils.getRandomizedTempPath());
- conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, maxPipelinePerNode);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
- .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
+ .setTotalPipelineNumLimit(numDatanodes + numDatanodes)
.setHbInterval(2000)
.setHbProcessorInterval(1000)
.build();
@@ -162,6 +173,72 @@ public class TestRatisPipelineCreateAndDestroy {
}
}
+ @Test(timeout = 300000)
+ public void testMultiRaftStorageDir() throws Exception {
+ final String suffix = "-testMultiRaftStorageDir-";
+ Map<String, AtomicInteger> directories = new ConcurrentHashMap<>();
+ int maxPipeline = maxPipelinePerNode;
+ int index = 0;
+ while(maxPipeline > 1) {
+ directories.put("ratis" + suffix + (index++), new AtomicInteger(0));
+ maxPipeline--;
+ }
+
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
+ 5, TimeUnit.SECONDS);
+ conf.set("dfs.container.ratis.datanode.storage.dir.suffix", suffix);
+
+ // Create 3 RATIS THREE pipeline
+ init(3);
+ // make sure a pipelines is created
+ waitForPipelines(3);
+ List<Pipeline> pipelines =
+ pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ List<RaftGroupId> raftGroupIds = new ArrayList<>();
+ pipelines.stream().forEach(pipeline ->
+ raftGroupIds.add(RaftGroupId.valueOf(pipeline.getId().getId())));
+
+ List<HddsDatanodeService> dns = new
ArrayList<>(cluster.getHddsDatanodes());
+ dns.stream().forEach(dn -> {
+ XceiverServerSpi writeChannel =
+ dn.getDatanodeStateMachine().getContainer().getWriteChannel();
+ RaftServerProxy server =
+ (RaftServerProxy)((XceiverServerRatis)writeChannel).getServer();
+ raftGroupIds.stream().forEach(group -> {
+ try {
+ RaftServerImpl raft = server.getImpl(group);
+ String raftDir =
+
raft.getState().getStorage().getStorageDir().getRoot().toString();
+ directories.keySet().stream().forEach(path -> {
+ if (raftDir.contains(path)) {
+ directories.get(path).incrementAndGet();
+ }
+ });
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ });
+
+ directories.values().stream().forEach(
+ count -> Assert.assertEquals(maxPipelinePerNode - 1, count.get()));
+ }
+
+ @Test(timeout = 30000)
+ public void testMultiRaftPipelineWithSingleStorageDir() throws Exception {
+ int datanodeNum = 3;
+ // Create 3 RATIS THREE pipeline
+ init(datanodeNum);
+ // make sure a pipelines is created
+ waitForPipelines(datanodeNum);
+ List<Pipeline> pipelines =
+ pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals((datanodeNum * (maxPipelinePerNode - 1) /
+ HddsProtos.ReplicationFactor.THREE.getNumber()), pipelines.size());
+ }
+
private void waitForPipelines(int numPipelines)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> pipelineManager
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index bc937aa..fd7d221 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+
+import io.netty.util.internal.StringUtil;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.StorageUnit;
@@ -69,6 +71,8 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -604,22 +608,39 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
String[] args = new String[] {};
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();
+
+ String suffix =
+ conf.get("dfs.container.ratis.datanode.storage.dir.suffix");
for (int i = 0; i < numOfDatanodes; i++) {
OzoneConfiguration dnConf = new OzoneConfiguration(conf);
String datanodeBaseDir = path + "/datanode-" + Integer.toString(i);
Path metaDir = Paths.get(datanodeBaseDir, "meta");
Path dataDir = Paths.get(datanodeBaseDir, "data", "containers");
- Path ratisDir = Paths.get(datanodeBaseDir, "data", "ratis");
+ String ratisPath = "";
+ if (StringUtil.isNullOrEmpty(suffix)) {
+ ratisPath = Paths.get(datanodeBaseDir, "data", "ratis").toString();
+ } else {
+ int index = 0;
+ int maxPipelinePerNode =
+ conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
+ while (maxPipelinePerNode > 1) {
+ ratisPath += Paths.get(datanodeBaseDir,
+ "data", "ratis" + suffix + (index++)).toString() + ",";
+ maxPipelinePerNode--;
+ }
+ // remove the tail ","
+ ratisPath = ratisPath.substring(0, ratisPath.length() - 1);
+ }
+
Path wrokDir = Paths.get(datanodeBaseDir, "data", "replication",
"work");
Files.createDirectories(metaDir);
Files.createDirectories(dataDir);
- Files.createDirectories(ratisDir);
Files.createDirectories(wrokDir);
dnConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
dataDir.toString());
dnConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
- ratisDir.toString());
+ ratisPath);
dnConf.set(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR,
wrokDir.toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]