liyuheng55555 commented on code in PR #12184:
URL: https://github.com/apache/iotdb/pull/12184#discussion_r1544161090


##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT2.java:
##########


Review Comment:
   As the integration framework requires, the name of IT class should end with 
"IT"



##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT2.java:
##########


Review Comment:
   As the integration framework requires, the name of IT class should end with 
"IT"



##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT.java:
##########
@@ -279,18 +273,17 @@ public void testMCFLeaderDistributionWithReadOnlyStatus() 
throws Exception {
             .forEach(
                 regionInfo -> {
                   if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
-                    leaderCounter
-                        .computeIfAbsent(regionInfo.getDataNodeId(), empty -> 
new AtomicInteger(0))
-                        .getAndIncrement();
+                    leaderCounter.merge(regionInfo.getDataNodeId(), 1, 
Integer::sum);
                   }
                 });
 
         // All DataNodes have Region-leader
-        isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
+        isDistributionBalanced = leaderCounter.size() == TEST_DATA_NODE_NUM;
         // Each DataNode has exactly 1 Region-leader
-        for (AtomicInteger leaderCount : leaderCounter.values()) {
-          if (leaderCount.get() != 1) {
+        for (Integer leaderCount : leaderCounter.values()) {
+          if (leaderCount != databaseNum / TEST_DATA_NODE_NUM) {
             isDistributionBalanced = false;
+            break;

Review Comment:
   consider using Awaitility ? will make the code more clear



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java:
##########
@@ -153,57 +181,74 @@ private void constructMCFGraph() {
 
     /* Construct edges: sNode -> rNodes */
     for (int rNode : rNodeMap.values()) {
-      // Cost: 0
+      // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
       addAdjacentEdges(S_NODE, rNode, 1, 0);
     }
 
-    /* Construct edges: rNodes -> dNodes */
-    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
-      int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
-      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
-        int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
-        // Cost: 1 if the dNode is corresponded to the current leader of the 
rNode,
-        //       0 otherwise.
-        // Therefore, the RegionGroup will keep the leader as constant as 
possible.
-        int cost =
-            regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
-                    == dataNodeLocation.getDataNodeId()
-                ? 0
-                : 1;
-        addAdjacentEdges(rNode, dNode, 1, cost);
+    /* Construct edges: rNodes -> sdNodes */
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+        int rNode = rNodeMap.get(regionGroupId);
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          int sDNode = sDNodeMap.get(database).get(dataNodeId);
+          // Capacity: 1, Cost: 1 if sDNode is the current leader of the 
rNode, 0 otherwise.
+          // Therefore, the RegionGroup will keep the leader as constant as 
possible.
+          int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == 
dataNodeId ? 0 : 1;
+          addAdjacentEdges(rNode, sDNode, 1, cost);
+        }
       }
     }
 
-    /* Construct edges: dNodes -> tNode */
-    // Count the possible maximum number of leader in each DataNode
-    Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
-    regionReplicaSetMap
-        .values()
-        .forEach(
-            regionReplicaSet ->
-                regionReplicaSet
-                    .getDataNodeLocations()
-                    .forEach(
-                        dataNodeLocation ->
-                            maxLeaderCounter
-                                .computeIfAbsent(
-                                    dataNodeLocation.getDataNodeId(), empty -> 
new AtomicInteger(0))
-                                .getAndIncrement()));
-
-    for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
-      int dataNodeId = dNodeEntry.getKey();
-      int dNode = dNodeEntry.getValue();
-
-      if (disabledDataNodeSet.contains(dataNodeId)) {
-        // Skip disabled DataNode
-        continue;
+    /* Construct edges: sDNodes -> tDNodes */
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      // Map<DataNodeId, leader number>
+      Map<Integer, Integer> leaderCounter = new TreeMap<>();
+      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          int sDNode = sDNodeMap.get(database).get(dataNodeId);
+          int tDNode = tDNodeMap.get(dataNodeId);
+          int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+          // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+          // Thus, the leader distribution will be as balance as possible 
within each Database
+          // based on the Jensen's-Inequality.
+          addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+        }
       }
+    }
 
-      int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
-      for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+    /* Construct edges: tDNodes -> tNode */
+    // Map<DataNodeId, possible max leader> Count the possible maximum number 
of leader in each
+    // DataNode

Review Comment:
   make comment format consistent ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to