bshashikant commented on a change in pull request #1371:
URL: https://github.com/apache/hadoop-ozone/pull/1371#discussion_r488543079
##########
File path:
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
##########
@@ -98,8 +105,65 @@ private boolean exceedPipelineNumberLimit(ReplicationFactor
factor) {
return false;
}
+ private Map<DatanodeDetails, Integer> getSuggestedLeaderCount(
+ List<DatanodeDetails> dns) {
+ Map<DatanodeDetails, Integer> suggestedLeaderCount = new HashMap<>();
+ for (DatanodeDetails dn : dns) {
+ suggestedLeaderCount.put(dn, 0);
+
+ Set<PipelineID> pipelineIDSet = getNodeManager().getPipelines(dn);
+ for (PipelineID pipelineID : pipelineIDSet) {
+ try {
+ Pipeline pipeline =
getPipelineStateManager().getPipeline(pipelineID);
+ if (!pipeline.isClosed()
+ && dn.getUuid().equals(pipeline.getSuggestedLeaderId())) {
+ suggestedLeaderCount.put(dn, suggestedLeaderCount.get(dn) + 1);
+ }
+ } catch (PipelineNotFoundException e) {
+ LOG.debug("Pipeline not found in pipeline state manager : {}",
+ pipelineID, e);
+ }
+ }
+ }
+
+ return suggestedLeaderCount;
+ }
+
+ private DatanodeDetails getSuggestedLeader(List<DatanodeDetails> dns) {
+ Map<DatanodeDetails, Integer> suggestedLeaderCount =
Review comment:
I think suggested leader selection can be made a policy driven change.
1) default policy can be Min leader election count
2) It can also be driven by factors like memory/resource availability on a
datanode
3) Can be determined by the topology as well. The node nearest to the client
can be made the leader .
Its better to make it a pluggable model like this.
##########
File path:
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
##########
@@ -92,11 +98,128 @@ public void
testAutomaticPipelineCreationOnPipelineDestroy()
waitForPipelines(2);
}
+ private void checkLeaderBalance(int dnNum, int leaderNumOfEachDn)
+ throws Exception {
+ List<Pipeline> pipelines = pipelineManager
+ .getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
+
+ for (Pipeline pipeline : pipelines) {
+ LambdaTestUtils.await(30000, 500, () ->
+ pipeline.getLeaderId().equals(pipeline.getSuggestedLeaderId()));
+ }
+
+ Map<UUID, Integer> leaderCount = new HashMap<>();
+ for (Pipeline pipeline : pipelines) {
+ UUID leader = pipeline.getLeaderId();
+ if (!leaderCount.containsKey(leader)) {
+ leaderCount.put(leader, 0);
+ }
+
+ leaderCount.put(leader, leaderCount.get(leader) + 1);
+ }
+
+ Assert.assertTrue(leaderCount.size() == dnNum);
+ for (UUID key : leaderCount.keySet()) {
+ Assert.assertTrue(leaderCount.get(key) == leaderNumOfEachDn);
+ }
+ }
+
+ @Test(timeout = 360000)
+ public void testRestoreSuggestedLeader() throws Exception {
+ conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+ int dnNum = 3;
+ int dnPipelineLimit = 3;
+ int leaderNumOfEachDn = dnPipelineLimit / dnNum;
+ int pipelineNum = 3;
+
+ init(dnNum, dnPipelineLimit);
+ // make sure two pipelines are created
+ waitForPipelines(pipelineNum);
+ // No Factor ONE pipeline is auto created.
+ Assert.assertEquals(0, pipelineManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE).size());
+
+ // pipelineNum pipelines in 3 datanodes,
+ // each datanode has leaderNumOfEachDn leaders after balance
+ checkLeaderBalance(dnNum, leaderNumOfEachDn);
+ List<Pipeline> pipelinesBeforeRestart =
+ cluster.getStorageContainerManager().getPipelineManager()
+ .getPipelines();
+
+ cluster.restartStorageContainerManager(true);
+
+ checkLeaderBalance(dnNum, leaderNumOfEachDn);
+ List<Pipeline> pipelinesAfterRestart =
+ cluster.getStorageContainerManager().getPipelineManager()
+ .getPipelines();
+
+ Assert.assertEquals(
+ pipelinesBeforeRestart.size(), pipelinesAfterRestart.size());
+
+ for (Pipeline p : pipelinesBeforeRestart) {
+ boolean equal = false;
+ for (Pipeline q : pipelinesAfterRestart) {
+ if (p.getId().equals(q.getId())
+ && p.getSuggestedLeaderId().equals(q.getSuggestedLeaderId())) {
+ equal = true;
+ }
+ }
+
+ Assert.assertTrue(equal);
+ }
+ }
+
+ @Test(timeout = 360000)
+ public void testPipelineLeaderBalance() throws Exception {
+ conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+ int dnNum = 3;
+ int dnPipelineLimit = 3;
+ int leaderNumOfEachDn = dnPipelineLimit / dnNum;
+ int pipelineNum = 3;
+
+ init(dnNum, dnPipelineLimit);
+ // make sure two pipelines are created
+ waitForPipelines(pipelineNum);
+ // No Factor ONE pipeline is auto created.
+ Assert.assertEquals(0, pipelineManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE).size());
+
+ // pipelineNum pipelines in 3 datanodes,
+ // each datanode has leaderNumOfEachDn leaders after balance
+ checkLeaderBalance(dnNum, leaderNumOfEachDn);
+
+ //cluster.restartStorageContainerManager(true);
Review comment:
can we remove this commented out lines?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]