CRZbulabula commented on code in PR #12184:
URL: https://github.com/apache/iotdb/pull/12184#discussion_r1544526934
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBRegionGroupLeaderDistributionIT2.java:
##########
Review Comment:
Fixed
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java:
##########
@@ -153,57 +181,74 @@ private void constructMCFGraph() {
/* Construct edges: sNode -> rNodes */
for (int rNode : rNodeMap.values()) {
- // Cost: 0
+ // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader
addAdjacentEdges(S_NODE, rNode, 1, 0);
}
- /* Construct edges: rNodes -> dNodes */
- for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) {
- int rNode = rNodeMap.get(regionReplicaSet.getRegionId());
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
- int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId());
- // Cost: 1 if the dNode is corresponded to the current leader of the
rNode,
- // 0 otherwise.
- // Therefore, the RegionGroup will keep the leader as constant as
possible.
- int cost =
- regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1)
- == dataNodeLocation.getDataNodeId()
- ? 0
- : 1;
- addAdjacentEdges(rNode, dNode, 1, cost);
+ /* Construct edges: rNodes -> sdNodes */
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+ int rNode = rNodeMap.get(regionGroupId);
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ // Capacity: 1, Cost: 1 if sDNode is the current leader of the
rNode, 0 otherwise.
+ // Therefore, the RegionGroup will keep the leader as constant as
possible.
+ int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) ==
dataNodeId ? 0 : 1;
+ addAdjacentEdges(rNode, sDNode, 1, cost);
+ }
}
}
- /* Construct edges: dNodes -> tNode */
- // Count the possible maximum number of leader in each DataNode
- Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>();
- regionReplicaSetMap
- .values()
- .forEach(
- regionReplicaSet ->
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation ->
- maxLeaderCounter
- .computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty ->
new AtomicInteger(0))
- .getAndIncrement()));
-
- for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) {
- int dataNodeId = dNodeEntry.getKey();
- int dNode = dNodeEntry.getValue();
-
- if (disabledDataNodeSet.contains(dataNodeId)) {
- // Skip disabled DataNode
- continue;
+ /* Construct edges: sDNodes -> tDNodes */
+ for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry :
+ databaseRegionGroupMap.entrySet()) {
+ String database = databaseEntry.getKey();
+ // Map<DataNodeId, leader number>
+ Map<Integer, Integer> leaderCounter = new TreeMap<>();
+ for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) {
+ for (TDataNodeLocation dataNodeLocation :
+ regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) {
+ int dataNodeId = dataNodeLocation.getDataNodeId();
+ if (disabledDataNodeSet.contains(dataNodeId)) {
+ // Skip disabled DataNode
+ continue;
+ }
+ int sDNode = sDNodeMap.get(database).get(dataNodeId);
+ int tDNode = tDNodeMap.get(dataNodeId);
+ int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum);
+ // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode.
+ // Thus, the leader distribution will be as balance as possible
within each Database
+ // based on the Jensen's-Inequality.
+ addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount);
+ }
}
+ }
- int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get();
- for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) {
+ /* Construct edges: tDNodes -> tNode */
+ // Map<DataNodeId, possible max leader> Count the possible maximum number
of leader in each
+ // DataNode
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]