This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f8a62d6 HDDS-3810. Add the logic to distribute open containers among
the pipelines of a datanode. (#1274)
f8a62d6 is described below
commit f8a62d6d506974b5ee2df64bdc2ea3aea6ff7610
Author: bshashikant <[email protected]>
AuthorDate: Mon Oct 5 11:02:22 2020 +0530
HDDS-3810. Add the logic to distribute open containers among the pipelines
of a datanode. (#1274)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 ++
.../common/src/main/resources/ozone-default.xml | 7 ++
.../hdds/scm/container/SCMContainerManager.java | 11 +++-
.../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 33 ++++++++++
.../apache/hadoop/hdds/scm/node/NodeManager.java | 6 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 53 +++++++++++++++-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 4 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 7 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 6 +-
.../hdds/scm/pipeline/SCMPipelineManager.java | 16 ++++-
.../hadoop/hdds/scm/block/TestBlockManager.java | 69 ++++++++++++++++++++
.../hadoop/hdds/scm/container/MockNodeManager.java | 28 +++++++-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 4 +-
.../scm/pipeline/TestPipelinePlacementPolicy.java | 4 +-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 74 ++++++++++++++++++++++
.../testutils/ReplicationNodeManagerMock.java | 12 +++-
.../hadoop/ozone/fsck/TestContainerMapper.java | 3 +
17 files changed, 319 insertions(+), 22 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 672b440..540d2c0 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -313,6 +313,10 @@ public final class ScmConfigKeys {
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
"ozone.scm.keyvalue.container.deletion-choosing.policy";
+ public static final String OZONE_SCM_PIPELINE_PER_METADATA_VOLUME =
+ "ozone.scm.pipeline.per.metadata.disk";
+
+ public static final int OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT = 2;
// Max timeout for pipeline to stay at ALLOCATED state before scrubbed.
public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
"ozone.scm.pipeline.allocated.timeout";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f16ff3f..70d35a3 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -822,6 +822,13 @@
</description>
</property>
<property>
+ <name>ozone.scm.pipeline.per.metadata.disk</name>
+ <value>2</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>Number of pipelines to be created per raft log disk.
+ </description>
+ </property>
+ <property>
<name>ozone.datanode.pipeline.limit</name>
<value>2</value>
<tag>OZONE, SCM, PIPELINE</tag>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 5857c4c..fa286a2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -107,6 +107,14 @@ public class SCMContainerManager implements
ContainerManager {
scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
}
+ private int getOpenContainerCountPerPipeline(Pipeline pipeline) {
+ int minContainerCountPerDn = numContainerPerVolume *
+ pipelineManager.minHealthyVolumeNum(pipeline);
+ int minPipelineCountPerDn = pipelineManager.minPipelineLimit(pipeline);
+ return (int) Math.ceil(
+ ((double) minContainerCountPerDn / minPipelineCountPerDn));
+ }
+
private void loadExistingContainers() throws IOException {
TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
@@ -440,8 +448,7 @@ public class SCMContainerManager implements
ContainerManager {
synchronized (pipeline) {
containerIDs = getContainersForOwner(pipeline, owner);
- if (containerIDs.size() < numContainerPerVolume * pipelineManager.
- getNumHealthyVolumes(pipeline)) {
+ if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) {
containerInfo =
containerStateManager.allocateContainer(
pipelineManager, owner, pipeline);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index b39440f..2e7bdeb 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
import org.apache.hadoop.util.Time;
import java.util.Collections;
@@ -40,6 +42,7 @@ public class DatanodeInfo extends DatanodeDetails {
private long lastStatsUpdatedTime;
private List<StorageReportProto> storageReports;
+ private List<MetadataStorageReportProto> metadataStorageReports;
/**
* Constructs DatanodeInfo from DatanodeDetails.
@@ -51,6 +54,7 @@ public class DatanodeInfo extends DatanodeDetails {
this.lock = new ReentrantReadWriteLock();
this.lastHeartbeatTime = Time.monotonicNow();
this.storageReports = Collections.emptyList();
+ this.metadataStorageReports = Collections.emptyList();
}
/**
@@ -95,6 +99,22 @@ public class DatanodeInfo extends DatanodeDetails {
}
/**
+ * Updates the datanode metadata storage reports.
+ *
+ * @param reports list of metadata storage report
+ */
+ public void updateMetaDataStorageReports(
+ List<MetadataStorageReportProto> reports) {
+ try {
+ lock.writeLock().lock();
+ lastStatsUpdatedTime = Time.monotonicNow();
+ metadataStorageReports = reports;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
* Returns the storage reports associated with this datanode.
*
* @return list of storage report
@@ -122,6 +142,19 @@ public class DatanodeInfo extends DatanodeDetails {
}
/**
+ * Returns count of healthy metadata volumes reported from datanode.
+ * @return count of healthy metdata log volumes
+ */
+ public int getMetaDataVolumeCount() {
+ try {
+ lock.readLock().lock();
+ return metadataStorageReports.size();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
* Returns count of failed volumes reported from datanode.
* @return count of failed volumes
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index df21b84..4af2357 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -214,5 +214,9 @@ public interface NodeManager extends
StorageContainerNodeProtocol,
*/
NetworkTopology getClusterNetworkTopologyMap();
- int getNumHealthyVolumes(List <DatanodeDetails> dnList);
+ int minHealthyVolumeNum(List <DatanodeDetails> dnList);
+
+ int pipelineLimit(DatanodeDetails dn);
+
+ int minPipelineLimit(List<DatanodeDetails> dn);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 1a0cec3..328f271 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -42,6 +42,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -103,6 +104,8 @@ public class SCMNodeManager implements NodeManager {
private final boolean useHostname;
private final ConcurrentHashMap<String, Set<String>> dnsToUuidMap =
new ConcurrentHashMap<>();
+ private final int numPipelinesPerMetadataVolume;
+ private final int heavyNodeCriteria;
/**
* Constructs SCM machine Manager.
@@ -130,6 +133,11 @@ public class SCMNodeManager implements NodeManager {
this.useHostname = conf.getBoolean(
DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ this.numPipelinesPerMetadataVolume =
+ conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
+ String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+ this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
}
private void registerMXBean() {
@@ -363,6 +371,8 @@ public class SCMNodeManager implements NodeManager {
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
if (nodeReport != null) {
datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
+ datanodeInfo.updateMetaDataStorageReports(nodeReport.
+ getMetadataStorageReportList());
metrics.incNumNodeReportProcessed();
}
} catch (NodeNotFoundException e) {
@@ -511,11 +521,11 @@ public class SCMNodeManager implements NodeManager {
}
/**
- * Returns the max of no healthy volumes reported out of the set
+ * Returns the min of no healthy volumes reported out of the set
* of datanodes constituting the pipeline.
*/
@Override
- public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+ public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
List<Integer> volumeCountList = new ArrayList<>(dnList.size());
for (DatanodeDetails dn : dnList) {
try {
@@ -527,7 +537,44 @@ public class SCMNodeManager implements NodeManager {
}
}
Preconditions.checkArgument(!volumeCountList.isEmpty());
- return Collections.max(volumeCountList);
+ return Collections.min(volumeCountList);
+ }
+
+ /**
+ * Returns the pipeline limit for the datanode.
+ * if the datanode pipeline limit is set, consider that as the max
+ * pipeline limit.
+ * In case, the pipeline limit is not set, the max pipeline limit
+ * will be based on the no of raft log volume reported and provided
+ * that it has atleast one healthy data volume.
+ */
+ @Override
+ public int pipelineLimit(DatanodeDetails dn) {
+ try {
+ if (heavyNodeCriteria > 0) {
+ return heavyNodeCriteria;
+ } else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) {
+ return numPipelinesPerMetadataVolume *
+ nodeStateManager.getNode(dn).getMetaDataVolumeCount();
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Cannot generate NodeStat, datanode {} not found.",
+ dn.getUuid());
+ }
+ return 0;
+ }
+
+ /**
+ * Returns the pipeline limit for set of datanodes.
+ */
+ @Override
+ public int minPipelineLimit(List<DatanodeDetails> dnList) {
+ List<Integer> pipelineCountList = new ArrayList<>(dnList.size());
+ for (DatanodeDetails dn : dnList) {
+ pipelineCountList.add(pipelineLimit(dn));
+ }
+ Preconditions.checkArgument(!pipelineCountList.isEmpty());
+ return Collections.min(pipelineCountList);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 857f76e..0cb905e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -89,7 +89,9 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean,
void incNumBlocksAllocatedMetric(PipelineID id);
- int getNumHealthyVolumes(Pipeline pipeline);
+ int minHealthyVolumeNum(Pipeline pipeline);
+
+ int minPipelineLimit(Pipeline pipeline);
/**
* Activates a dormant pipeline.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 84efdc2..b9441be 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -75,9 +75,8 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
this.nodeManager = nodeManager;
this.conf = conf;
this.stateManager = stateManager;
- this.heavyNodeCriteria = conf.getInt(
- ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
- ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+ String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+ this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
}
int currentPipelineCount(DatanodeDetails datanodeDetails, int nodesRequired)
{
@@ -149,7 +148,7 @@ public final class PipelinePlacementPolicy extends
SCMCommonPlacementPolicy {
.map(d ->
new DnWithPipelines(d, currentPipelineCount(d, nodesRequired)))
.filter(d ->
- ((d.getPipelines() < heavyNodeCriteria) || heavyNodeCriteria == 0))
+ (d.getPipelines() < nodeManager.pipelineLimit(d.getDn())))
.sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
.map(d -> d.getDn())
.collect(Collectors.toList());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 4d91541..e39f141 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -64,9 +64,9 @@ public class RatisPipelineProvider extends PipelineProvider {
this.pipelineNumberLimit = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
- this.maxPipelinePerDatanode = conf.getInt(
- ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
- ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+ String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+ this.maxPipelinePerDatanode = dnLimit == null ? 0 :
+ Integer.parseInt(dnLimit);
}
private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index f072ebb..db7fcae 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -672,13 +672,23 @@ public class SCMPipelineManager implements
PipelineManager {
}
/**
- * returns max count of healthy volumes from the set of
+ * returns min number of healthy volumes from the set of
* datanodes constituting the pipeline.
* @param pipeline
* @return healthy volume count
*/
- public int getNumHealthyVolumes(Pipeline pipeline) {
- return nodeManager.getNumHealthyVolumes(pipeline.getNodes());
+ public int minHealthyVolumeNum(Pipeline pipeline) {
+ return nodeManager.minHealthyVolumeNum(pipeline.getNodes());
+ }
+
+ /**
+ * returns max count of raft log volumes from the set of
+ * datanodes constituting the pipeline.
+ * @param pipeline
+ * @return healthy volume count
+ */
+ public int minPipelineLimit(Pipeline pipeline) {
+ return nodeManager.minPipelineLimit(pipeline.getNodes());
}
protected ReadWriteLock getLock() {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index a72031c..6b6e8d8 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -245,6 +245,7 @@ public class TestBlockManager {
public void testBlockDistribution() throws Exception {
int threadCount = numContainerPerOwnerInPipeline *
numContainerPerOwnerInPipeline;
+ nodeManager.setNumPipelinePerDatanode(1);
List<ExecutorService> executors = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
executors.add(Executors.newSingleThreadExecutor());
@@ -304,6 +305,7 @@ public class TestBlockManager {
int threadCount = numContainerPerOwnerInPipeline *
numContainerPerOwnerInPipeline;
nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
+ nodeManager.setNumPipelinePerDatanode(1);
List<ExecutorService> executors = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
executors.add(Executors.newSingleThreadExecutor());
@@ -365,6 +367,71 @@ public class TestBlockManager {
}
@Test
+ public void testBlockDistributionWithMultipleRaftLogDisks() throws Exception
{
+ int threadCount = numContainerPerOwnerInPipeline *
+ numContainerPerOwnerInPipeline;
+ int numMetaDataVolumes = 2;
+ nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
+ nodeManager.setNumMetaDataVolumes(numMetaDataVolumes);
+ List<ExecutorService> executors = new ArrayList<>(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executors.add(Executors.newSingleThreadExecutor());
+ }
+ pipelineManager.createPipeline(type, factor);
+ TestUtils.openAllRatisPipelines(pipelineManager);
+ Map<Long, List<AllocatedBlock>> allocatedBlockMap =
+ new ConcurrentHashMap<>();
+ List<CompletableFuture<AllocatedBlock>> futureList =
+ new ArrayList<>(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ final CompletableFuture<AllocatedBlock> future =
+ new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ List<AllocatedBlock> blockList;
+ AllocatedBlock block = blockManager
+ .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor,
+ OzoneConsts.OZONE,
+ new ExcludeList());
+ long containerId = block.getBlockID().getContainerID();
+ if (!allocatedBlockMap.containsKey(containerId)) {
+ blockList = new ArrayList<>();
+ } else {
+ blockList = allocatedBlockMap.get(containerId);
+ }
+ blockList.add(block);
+ allocatedBlockMap.put(containerId, blockList);
+ future.complete(block);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }, executors.get(i));
+ futureList.add(future);
+ }
+ try {
+ CompletableFuture
+ .allOf(futureList.toArray(
+ new CompletableFuture[futureList.size()])).get();
+ Assert.assertTrue(
+ pipelineManager.getPipelines(type).size() == 1);
+ Pipeline pipeline = pipelineManager.getPipelines(type).get(0);
+ // the pipeline per raft log disk config is set to 1 by default
+ int numContainers = (int)Math.ceil((double)
+ (numContainerPerOwnerInPipeline *
+ numContainerPerOwnerInPipeline)/numMetaDataVolumes);
+ Assert.assertTrue(numContainers == pipelineManager.
+ getNumberOfContainers(pipeline.getId()));
+ Assert.assertTrue(
+ allocatedBlockMap.size() == numContainers);
+ Assert.assertTrue(allocatedBlockMap.
+ values().size() == numContainers);
+ } catch (Exception e) {
+ Assert.fail("testAllocateBlockInParallel failed");
+ }
+ }
+
+ @Test
public void testAllocateOversizedBlock() throws Exception {
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
@@ -434,6 +501,8 @@ public class TestBlockManager {
@Test(timeout = 10000)
public void testMultipleBlockAllocationWithClosedContainer()
throws IOException, TimeoutException, InterruptedException {
+ nodeManager.setNumPipelinePerDatanode(1);
+ nodeManager.setNumHealthyVolumes(1);
// create pipelines
for (int i = 0;
i < nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size() / factor
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 4b8b37d..7aca0f3 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -68,6 +68,7 @@ import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
* Test Helper for testing container Mapping.
*/
public class MockNodeManager implements NodeManager {
+ public final static int NUM_PIPELINE_PER_METADATA_DISK = 2;
private final static NodeData[] NODES = {
new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB),
new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB),
@@ -93,6 +94,8 @@ public class MockNodeManager implements NodeManager {
private NetworkTopology clusterMap;
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
private int numHealthyDisksPerDatanode;
+ private int numRaftLogDisksPerDatanode;
+ private int numPipelinePerDatanode;
public MockNodeManager(NetworkTopologyImpl clusterMap,
List<DatanodeDetails> nodes,
@@ -123,6 +126,9 @@ public class MockNodeManager implements NodeManager {
safemode = false;
this.commandMap = new HashMap<>();
numHealthyDisksPerDatanode = 1;
+ numRaftLogDisksPerDatanode = 1;
+ numPipelinePerDatanode = numRaftLogDisksPerDatanode *
+ NUM_PIPELINE_PER_METADATA_DISK;
}
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
@@ -585,14 +591,34 @@ public class MockNodeManager implements NodeManager {
}
@Override
- public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+ public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
return numHealthyDisksPerDatanode;
}
+ @Override
+ public int pipelineLimit(DatanodeDetails dn) {
+ // by default 1 single node pipeline and 1 three node pipeline
+ return numPipelinePerDatanode;
+ }
+
+ @Override
+ public int minPipelineLimit(List<DatanodeDetails> dn) {
+ // by default 1 single node pipeline and 1 three node pipeline
+ return numPipelinePerDatanode;
+ }
+
+ public void setNumPipelinePerDatanode(int value) {
+ numPipelinePerDatanode = value;
+ }
+
public void setNumHealthyVolumes(int value) {
numHealthyDisksPerDatanode = value;
}
+ public void setNumMetaDataVolumes(int value) {
+ numRaftLogDisksPerDatanode = value;
+ }
+
/**
* A class to declare some values for the nodes so that our tests
* won't fail.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 7a58d46..3f3c4ae 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -872,7 +872,7 @@ public class TestSCMNodeManager {
.getScmUsed().get());
assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
.getRemaining().get());
- assertEquals(1, nodeManager.getNumHealthyVolumes(dnList));
+ assertEquals(1, nodeManager.minHealthyVolumeNum(dnList));
dnList.clear();
}
}
@@ -917,7 +917,7 @@ public class TestSCMNodeManager {
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(volumeCount / 2,
- nodeManager.getNumHealthyVolumes(dnList));
+ nodeManager.minHealthyVolumeNum(dnList));
dnList.clear();
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 1274608..f024fc5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -86,6 +86,7 @@ public class TestPipelinePlacementPolicy {
false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
conf = new OzoneConfiguration();
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT);
+ nodeManager.setNumPipelinePerDatanode(PIPELINE_LOAD_LIMIT);
stateManager = new PipelineStateManager();
placementPolicy = new PipelinePlacementPolicy(
nodeManager, stateManager, conf);
@@ -193,7 +194,8 @@ public class TestPipelinePlacementPolicy {
nodeManager.addPipeline(pipeline);
stateManager.addPipeline(pipeline);
} catch (SCMException e) {
- break;
+ throw e;
+ //break;
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 25957d8..d14e468 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -70,6 +70,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.InOrder;
+
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -275,6 +276,7 @@ public class TestSCMPipelineManager {
new SCMPipelineManager(conf, nodeManagerMock,
scmMetadataStore.getPipelineTable(), new EventQueue());
pipelineManager.allowPipelineCreation();
+ nodeManagerMock.setNumPipelinePerDatanode(1);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock,
pipelineManager.getStateManager(), conf);
@@ -336,6 +338,78 @@ public class TestSCMPipelineManager {
}
@Test
+ public void testPipelineLimit() throws Exception {
+ int numMetaDataVolumes = 2;
+ final OzoneConfiguration config = new OzoneConfiguration();
+ config.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ false);
+ // turning off this config will ensure, pipeline creation is determined by
+ // metadata volume count.
+ config.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 0);
+ MockNodeManager nodeManagerMock = new MockNodeManager(true,
+ 3);
+ nodeManagerMock.setNumMetaDataVolumes(numMetaDataVolumes);
+ int pipelinePerDn = numMetaDataVolumes *
+ MockNodeManager.NUM_PIPELINE_PER_METADATA_DISK;
+ nodeManagerMock.setNumPipelinePerDatanode(pipelinePerDn);
+ SCMPipelineManager pipelineManager =
+ new SCMPipelineManager(config, nodeManagerMock,
+ scmMetadataStore.getPipelineTable(), new EventQueue());
+ pipelineManager.allowPipelineCreation();
+ PipelineProvider mockRatisProvider =
+ new MockRatisPipelineProvider(nodeManagerMock,
+ pipelineManager.getStateManager(), config);
+ pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+ mockRatisProvider);
+
+ MetricsRecordBuilder metrics = getMetrics(
+ SCMPipelineMetrics.class.getSimpleName());
+ long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
+ metrics);
+ Assert.assertEquals(0, numPipelineAllocated);
+
+ // max limit on no of pipelines is 4
+ for (int i = 0; i < pipelinePerDn; i++) {
+ Pipeline pipeline = pipelineManager
+ .createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ Assert.assertNotNull(pipeline);
+ }
+
+ metrics = getMetrics(
+ SCMPipelineMetrics.class.getSimpleName());
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertEquals(4, numPipelineAllocated);
+
+ long numPipelineCreateFailed = getLongCounter(
+ "NumPipelineCreationFailed", metrics);
+ Assert.assertEquals(0, numPipelineCreateFailed);
+ //This should fail...
+ try {
+ pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ fail();
+ } catch (SCMException ioe) {
+ // pipeline creation failed this time.
+
Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ ioe.getResult());
+ }
+
+ metrics = getMetrics(
+ SCMPipelineMetrics.class.getSimpleName());
+ numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+ Assert.assertEquals(4, numPipelineAllocated);
+
+ numPipelineCreateFailed = getLongCounter(
+ "NumPipelineCreationFailed", metrics);
+ Assert.assertEquals(1, numPipelineCreateFailed);
+
+ // clean up
+ pipelineManager.close();
+ }
+
+ @Test
public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index a9b879f..6d088fe 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -345,7 +345,17 @@ public class ReplicationNodeManagerMock implements
NodeManager {
}
@Override
- public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
+ public int minHealthyVolumeNum(List<DatanodeDetails> dnList) {
+ return 0;
+ }
+
+ @Override
+ public int pipelineLimit(DatanodeDetails dn) {
+ return 0;
+ }
+
+ @Override
+ public int minPipelineLimit(List<DatanodeDetails> dn) {
return 0;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
index 4fa5ea4..2d1acf7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
@@ -72,6 +72,9 @@ public class TestContainerMapper {
dbPath = GenericTestUtils.getRandomizedTempPath();
conf.set(OZONE_OM_DB_DIRS, dbPath);
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, "100MB");
+ // By default, 2 pipelines are created. Setting the value to 6, will ensure
+ // each pipleine can have 3 containers open.
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 6);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.setScmId(SCM_ID)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]