HxpSerein commented on code in PR #15282:
URL: https://github.com/apache/iotdb/pull/15282#discussion_r2049813784
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java:
##########
@@ -125,21 +125,241 @@ public TRegionReplicaSet
generateOptimalRegionReplicasDistribution(
}
}
+ @Override
+ public Map<TConsensusGroupId, TDataNodeConfiguration>
removeNodeReplicaSelect(
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
+ Map<Integer, Double> freeDiskSpaceMap,
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ Map<TConsensusGroupId, String> regionDatabaseMap,
+ Map<String, List<TRegionReplicaSet>> databaseAllocatedRegionGroupMap,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ try {
+ // 1. prepare: compute regionCounter, databaseRegionCounter, and
combinationCounter
+
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups =
+ new ArrayList<>(databaseAllocatedRegionGroupMap.values()).get(0);
+ prepare(availableDataNodeMap, allocatedRegionGroups,
databaseAllocatedRegionGroups);
+
+ // 2. Build allowed candidate set for each region that needs to be
migrated.
+ // For each region in remainReplicasMap, the candidate destination nodes
are all nodes in
+ // availableDataNodeMap
+ // excluding those already in the remain replica set.
+ List<TConsensusGroupId> regionKeys = new
ArrayList<>(remainReplicasMap.keySet());
+ Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap = new
HashMap<>();
+ for (TConsensusGroupId regionId : regionKeys) {
+ TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId);
+ Set<Integer> notAllowedNodes = new HashSet<>();
+
+ // Exclude nodes already present in the remain replica set
+ for (TDataNodeLocation location :
remainReplicaSet.getDataNodeLocations()) {
+ notAllowedNodes.add(location.getDataNodeId());
+ }
+
+ // Allowed candidates are the nodes not in the exclusion set
+ List<Integer> candidates =
+ availableDataNodeMap.keySet().stream()
+ .filter(nodeId -> !notAllowedNodes.contains(nodeId))
+ .sorted(
+ (a, b) -> {
+ int cmp = Integer.compare(regionCounter[a],
regionCounter[b]);
+ if (cmp == 0) {
+ cmp = Integer.compare(databaseRegionCounter[a],
databaseRegionCounter[b]);
+ }
+ return cmp;
+ })
+ .collect(Collectors.toList());
+
+ // Sort candidates in ascending order of current global load
(regionCounter)
+ allowedCandidatesMap.put(regionId, candidates);
+ }
+
+ // Optionally, sort regionKeys by the size of its candidate list
(smaller candidate sets
+ // first)
+ regionKeys.sort(Comparator.comparingInt(id ->
allowedCandidatesMap.get(id).size()));
+
+ int n = regionKeys.size();
+ // Each element holds the candidate nodeId chosen for the region at that
index
+ int[] currentAssignment = new int[n];
+ // additionalLoad holds the number of extra regions assigned to each
node in this migration
+ // solution.
+ int[] additionalLoad = new int[regionCounter.length];
+
+ // 3. Create a buffer for candidate solutions
+ List<int[]> optimalAssignments = new ArrayList<>();
+ // bestMetrics holds the best found metrics: [maxGlobalLoad,
maxDatabaseLoad, scatterValue].
+ // Initialize to high values.
+ int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE};
+
+ dfsRemoveNodeReplica(
+ 0,
+ regionKeys,
+ allowedCandidatesMap,
+ currentAssignment,
+ additionalLoad,
+ optimalAssignments,
+ bestMetrics,
+ remainReplicasMap);
+
+ // 4. Randomly select one solution from the candidate buffer
+ if (optimalAssignments.isEmpty()) {
+ // This should not happen if there is at least one valid assignment
+ return Collections.emptyMap();
+ }
+ Collections.shuffle(optimalAssignments);
+ int[] bestAssignment = optimalAssignments.get(0);
+
+ // 5. Build and return the result mapping: region -> chosen destination
TDataNodeConfiguration
+ Map<TConsensusGroupId, TDataNodeConfiguration> result = new HashMap<>();
+ for (int i = 0; i < n; i++) {
+ TConsensusGroupId regionId = regionKeys.get(i);
+ int chosenNodeId = bestAssignment[i];
+ result.put(regionId, availableDataNodeMap.get(chosenNodeId));
+ }
+ return result;
+ } finally {
+ // Clear any temporary state to avoid impacting subsequent calls
+ clear();
+ }
+ }
+
+ /**
+ * DFS method that searches for migration target assignments.
+ *
+ * <p>It enumerates all possible assignments (one candidate for each region)
and collects
+ * candidate solutions in the optimalAssignments buffer. The evaluation
metrics for each complete
+ * assignment (i.e. when index == regionKeys.size()) are:
+ *
+ * <p>1. Max global load: the maximum over nodes of (regionCounter[node] +
additionalLoad[node])
+ * 2. Max database load: the maximum over nodes of
(databaseRegionCounter[node] +
+ * additionalLoad[node]) 3. Scatter value: computed per region, summing the
combinationCounter for
+ * every pair in the complete replica set. The complete replica set for a
region includes nodes in
+ * its remain replica set plus the newly assigned node.
+ *
+ * <p>The candidates are compared lexicographically (first by global load,
then by database load,
+ * then by scatter). When a better candidate is found, the
optimalAssignments buffer is cleared
+ * and updated; if the new candidate matches the best found metrics, it is
added to the buffer.
+ *
+ * <p>DFS search is pruned if the optimalAssignments buffer reaches CAPACITY.
+ *
+ * @param index Current DFS level, corresponding to regionKeys.get(index)
+ * @param regionKeys A list of regions that need to be migrated.
+ * @param allowedCandidatesMap For each region, the allowed candidate
destination node IDs.
+ * @param currentAssignment Current partial assignment; its length equals
the number of regions.
+ * @param additionalLoad Extra load currently assigned to each node.
+ * @param optimalAssignments Buffer holding candidate assignment arrays.
+ * @param bestMetrics An int array holding the best metrics found so far:
[maxGlobalLoad,
+ * maxDatabaseLoad, scatterValue].
+ * @param remainReplicasMap Mapping from region to its current remain
replica set.
+ */
+ private void dfsRemoveNodeReplica(
+ int index,
+ List<TConsensusGroupId> regionKeys,
+ Map<TConsensusGroupId, List<Integer>> allowedCandidatesMap,
+ int[] currentAssignment,
+ int[] additionalLoad,
+ List<int[]> optimalAssignments,
+ int[] bestMetrics,
+ Map<TConsensusGroupId, TRegionReplicaSet> remainReplicasMap) {
+ int n = regionKeys.size();
+ if (index == n) {
+ // A complete assignment has been generated.
+ // Compute metrics for this complete migration assignment.
+
+ // Compute the scatter value for the complete assignment.
+ int currentScatter = 0;
+ // For each region, calculate the scatter based on the
combinationCounter among all nodes
+ // in the full replica set (which includes the nodes in the remain
replica and the new
+ // candidate).
+ for (int r = 0; r < n; r++) {
+ TConsensusGroupId regionId = regionKeys.get(r);
+ for (TDataNodeLocation location :
remainReplicasMap.get(regionId).getDataNodeLocations()) {
+ int nodeA = currentAssignment[r];
+ int nodeB = location.getDataNodeId();
+ currentScatter += combinationCounter[nodeA][nodeB];
+ }
+ }
+
+ // Compute the maximum global load and maximum database load among all
nodes that received
+ // additional load.
+ int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter);
+
+ // Lexicographically compare currentMetrics with bestMetrics.
+ // If currentMetrics is better than bestMetrics, update bestMetrics and
clear the candidate
+ // buffer.
+ boolean isBetter = false;
+ boolean isEqual = true;
+ for (int i = 0; i < 3; i++) {
+ if (currentMetrics[i] < bestMetrics[i]) {
+ isBetter = true;
+ isEqual = false;
+ break;
+ } else if (currentMetrics[i] > bestMetrics[i]) {
+ isEqual = false;
+ break;
+ }
+ }
+ if (isBetter) {
+ bestMetrics[0] = currentMetrics[0];
+ bestMetrics[1] = currentMetrics[1];
+ bestMetrics[2] = currentMetrics[2];
+ optimalAssignments.clear();
+ optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
+ } else if (isEqual) {
+ optimalAssignments.add(Arrays.copyOf(currentAssignment, n));
+ // Prune search if we already have enough candidate solutions
+ if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) {
Review Comment:
Yes, you are right. DFS will try its best to find the best solution, but it
cannot guarantee that it will find the best solution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]