CRZbulabula commented on code in PR #12184:
URL: https://github.com/apache/iotdb/pull/12184#discussion_r1544526934


##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT2.java:
##########


Review Comment:
   Fixed



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java:
##########
@@ -153,57 +181,74 @@ private void constructMCFGraph() {
 
     /* Construct edges: sNode -> rNodes */
     for (int rNode : rNodeMap.values()) {
-      // Cost: 0
+      // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
       addAdjacentEdges(S_NODE, rNode, 1, 0);
     }
 
-    /* Construct edges: rNodes -> dNodes */
-    for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
-      int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
-      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
-        int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
-        // Cost: 1 if the dNode is corresponded to the current leader of the 
rNode,
-        //       0 otherwise.
-        // Therefore, the RegionGroup will keep the leader as constant as 
possible.
-        int cost =
-            regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
-                    == dataNodeLocation.getDataNodeId()
-                ? 0
-                : 1;
-        addAdjacentEdges(rNode, dNode, 1, cost);
+    /* Construct edges: rNodes -> sdNodes */
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+        int rNode = rNodeMap.get(regionGroupId);
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          int sDNode = sDNodeMap.get(database).get(dataNodeId);
+          // Capacity: 1, Cost: 1 if sDNode is the current leader of the 
rNode, 0 otherwise.
+          // Therefore, the RegionGroup will keep the leader as constant as 
possible.
+          int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == 
dataNodeId ? 0 : 1;
+          addAdjacentEdges(rNode, sDNode, 1, cost);
+        }
       }
     }
 
-    /* Construct edges: dNodes -> tNode */
-    // Count the possible maximum number of leader in each DataNode
-    Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
-    regionReplicaSetMap
-        .values()
-        .forEach(
-            regionReplicaSet ->
-                regionReplicaSet
-                    .getDataNodeLocations()
-                    .forEach(
-                        dataNodeLocation ->
-                            maxLeaderCounter
-                                .computeIfAbsent(
-                                    dataNodeLocation.getDataNodeId(), empty -> 
new AtomicInteger(0))
-                                .getAndIncrement()));
-
-    for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
-      int dataNodeId = dNodeEntry.getKey();
-      int dNode = dNodeEntry.getValue();
-
-      if (disabledDataNodeSet.contains(dataNodeId)) {
-        // Skip disabled DataNode
-        continue;
+    /* Construct edges: sDNodes -> tDNodes */
+    for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+        databaseRegionGroupMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      // Map<DataNodeId, leader number>
+      Map<Integer, Integer> leaderCounter = new TreeMap<>();
+      for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+        for (TDataNodeLocation dataNodeLocation :
+            regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+          int dataNodeId = dataNodeLocation.getDataNodeId();
+          if (disabledDataNodeSet.contains(dataNodeId)) {
+            // Skip disabled DataNode
+            continue;
+          }
+          int sDNode = sDNodeMap.get(database).get(dataNodeId);
+          int tDNode = tDNodeMap.get(dataNodeId);
+          int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+          // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+          // Thus, the leader distribution will be as balance as possible 
within each Database
+          // based on the Jensen's-Inequality.
+          addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+        }
       }
+    }
 
-      int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
-      for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+    /* Construct edges: tDNodes -> tNode */
+    // Map<DataNodeId, possible max leader> Count the possible maximum number 
of leader in each
+    // DataNode

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to