jiajunwang commented on a change in pull request #639: Refine the WAGED 
rebalancer to minimize the partial rebalance workload.
URL: https://github.com/apache/helix/pull/639#discussion_r352895899
 
 

 ##########
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 ##########
 @@ -93,42 +187,126 @@ public static ClusterModel 
generateClusterModel(ResourceControllerDataProvider d
     return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
   }
 
+  // Filter the replicas map so only the replicas that have been allocated in 
the existing
+  // assignmentMap remain in the map.
+  private static void removeUnknownReplicas(Map<String, 
Set<AssignableReplica>> replicaMap,
+      Map<String, ResourceAssignment> assignmentMap) {
+    replicaMap.entrySet().parallelStream().forEach(replicaSetEntry -> {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          getStateInstanceMap(assignmentMap.get(replicaSetEntry.getKey()));
+      // Iterate the replicas of the resource to find the ones that require 
reallocating.
+      Iterator<AssignableReplica> replicaIter = 
replicaSetEntry.getValue().iterator();
+      while (replicaIter.hasNext()) {
+        AssignableReplica replica = replicaIter.next();
+        Set<String> validInstances =
+            stateInstanceMap.getOrDefault(replica.getPartitionName(), 
Collections.emptyMap())
+                .getOrDefault(replica.getReplicaState(), 
Collections.emptySet());
+        if (validInstances.isEmpty()) {
+          // Removing by comparing with the baseline assignment.
+          replicaIter.remove();
+        } else {
+          // Cleanup the state map record, so the selected instance won't be 
picked up again for
+          // the other replica checkup.
+          validInstances.remove(validInstances.iterator().next());
+        }
+      }
+    });
+  }
+
   /**
-   * Generate a cluster model based on the current state output and data cache.
-   * @param dataProvider           The controller's data cache.
-   * @param resourceMap            The full list of the resources to be 
rebalanced. Note that any
-   *                               resources that are not in this list will be 
removed from the
-   *                               final assignment.
-   * @param currentStateAssignment The resource assignment built from current 
state output.
-   * @return A cluster model based on the current state and data cache.
+   * Find the minimum set of replicas that need to be reassigned by comparing 
with the Baseline
+   * assignment.
+   * A replica needs to be reassigned if one of the following conditions is 
true:
+   * 1. The assignments in the Baseline and the Best possible assignment are 
different. And the
+   * assignment in the Baseline is valid. So it is worthwhile to move it.
+   * 2. The assignments is not in the Baseline or the Best possible assignment.
+   * Otherwise, the rebalancer just keeps the current Best possible assignment 
allocation.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by 
resource name.
+   * @param activeInstances        All the instances that are live and enabled 
according to the delay rebalance configuration.
+   * @param baselineAssignment     The baseline assignment.
+   * @param bestPossibleAssignment The current best possible assignment.
+   * @param allocatedReplicas      Return the allocated replicas grouped by 
the target instance name.
+   * @return The replicas that need to be reassigned.
    */
-  public static ClusterModel generateClusterModelFromCurrentState(
-      ResourceControllerDataProvider dataProvider,
-      Map<String, Resource> resourceMap,
-      Map<String, ResourceAssignment> currentStateAssignment) {
-    return generateClusterModel(dataProvider, resourceMap, 
dataProvider.getEnabledLiveInstances(),
-        Collections.emptyMap(), Collections.emptyMap(), 
currentStateAssignment);
+  private static Set<AssignableReplica> 
findToBeAssignedReplicasByComparingBaseline(
+      Map<String, Set<AssignableReplica>> replicaMap, Set<String> 
activeInstances,
+      Map<String, ResourceAssignment> baselineAssignment,
+      Map<String, ResourceAssignment> bestPossibleAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    // check each resource to identify the allocated replicas and 
to-be-assigned replicas.
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> baselinePartitionStateMap =
+          getValidStateInstanceMap(baselineAssignment.get(resourceName), 
activeInstances);
+      Map<String, Map<String, Set<String>>> bestPossiblePartitionStateMap =
+          getValidStateInstanceMap(bestPossibleAssignment.get(resourceName), 
activeInstances);
+      // Iterate the replicas of the resource to find the ones that require 
reallocating.
+      for (AssignableReplica replica : replicaMap.get(resourceName)) {
+        String partitionName = replica.getPartitionName();
+        String replicaState = replica.getReplicaState();
+        // Find the allocation in the baseline
+        Set<String> baselineAllocations =
+            baselinePartitionStateMap.getOrDefault(partitionName, 
Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+        Set<String> bestPossibleAllocations =
+            bestPossiblePartitionStateMap.getOrDefault(partitionName, 
Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+
+        // Compare between the baseline and best possible assignments.
+        List<String> commonAllocations = new 
ArrayList<>(bestPossibleAllocations);
+        commonAllocations.retainAll(baselineAllocations);
+        if (!commonAllocations.isEmpty()) {
+          // 1. If the partition is allocated at the same location in both 
baseline and best possible
+          // assignment, there is no need to reassign it.
+          String allocatedInstance = commonAllocations.get(0);
+          allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new 
HashSet<>()).add(replica);
+          // clean up the record to prevent the same location being processed 
again.
+          baselineAllocations.remove(allocatedInstance);
+          bestPossibleAllocations.remove(allocatedInstance);
+        } else if (!baselineAllocations.isEmpty()) {
+          // 2. If the partition is allocated at an active instance in the 
Baseline but the
+          // allocation does not exist in the best possible assignment, try to 
rebalance it.
+          toBeAssignedReplicas.add(replica);
+          // clean up the baseline record to prevent the same location being 
picked up again.
 
 Review comment:
   Completely necessary. Please check the code carefully.
   We are firstly looping on the replica set. The internal instance iteration 
is reset every time. The current loop is not efficient, but it is the cleanest 
way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to