jiajunwang commented on a change in pull request #639: Refine the WAGED
rebalancer to minimize the partial rebalance workload.
URL: https://github.com/apache/helix/pull/639#discussion_r359107757
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
##########
@@ -93,42 +184,126 @@ public static ClusterModel
generateClusterModel(ResourceControllerDataProvider d
return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
}
+ // Filter the replicas map so only the replicas that have been allocated in
the existing
+ // assignmentMap remain in the map.
+ private static void retainExistingReplicas(Map<String,
Set<AssignableReplica>> replicaMap,
+ Map<String, ResourceAssignment> assignmentMap) {
+ replicaMap.entrySet().parallelStream().forEach(replicaSetEntry -> {
+ // <partition, <state, instances set>>
+ Map<String, Map<String, Set<String>>> stateInstanceMap =
+ getStateInstanceMap(assignmentMap.get(replicaSetEntry.getKey()));
+ // Iterate the replicas of the resource to find the ones that require
reallocating.
+ Iterator<AssignableReplica> replicaIter =
replicaSetEntry.getValue().iterator();
+ while (replicaIter.hasNext()) {
+ AssignableReplica replica = replicaIter.next();
+ Set<String> validInstances =
+ stateInstanceMap.getOrDefault(replica.getPartitionName(),
Collections.emptyMap())
+ .getOrDefault(replica.getReplicaState(),
Collections.emptySet());
+ if (validInstances.isEmpty()) {
+ // Removing by comparing with the baseline assignment.
+ replicaIter.remove();
+ } else {
+ // Remove the instance from the state map record, so it won't be
picked up again for
+ // the other replica checkup.
+ validInstances.remove(validInstances.iterator().next());
+ }
+ }
+ });
+ }
+
/**
- * Generate a cluster model based on the current state output and data cache.
- * @param dataProvider The controller's data cache.
- * @param resourceMap The full list of the resources to be
rebalanced. Note that any
- * resources that are not in this list will be
removed from the
- * final assignment.
- * @param currentStateAssignment The resource assignment built from current
state output.
- * @return A cluster model based on the current state and data cache.
+ * Find the minimum set of replicas that need to be reassigned by comparing
the Best
+ * possible assignment with the Baseline assignment.
+ * A replica needs to be reassigned if either of the following conditions is
true:
+ * 1. The partition allocation in the Baseline and the Best possible
assignment are different.
+ * And the allocation in the Baseline is valid. So it is worthwhile to move
it.
+ * 2. The partition allocation is not in the Baseline or the Best possible
assignment.
+ * Otherwise, the rebalancer just keeps the current Best possible assignment
allocation.
+ *
+ * @param replicaMap A map contains all the replicas grouped by
resource name.
+ * @param activeInstances All the instances that are live and enabled
according to the delay rebalance configuration.
+ * @param baselineAssignment The baseline assignment.
+ * @param bestPossibleAssignment The current best possible assignment.
+ * @param allocatedReplicas Return the allocated replicas grouped by
the target instance name.
+ * @return The replicas that need to be reassigned.
*/
- public static ClusterModel generateClusterModelFromCurrentState(
- ResourceControllerDataProvider dataProvider,
- Map<String, Resource> resourceMap,
- Map<String, ResourceAssignment> currentStateAssignment) {
- return generateClusterModel(dataProvider, resourceMap,
dataProvider.getEnabledLiveInstances(),
- Collections.emptyMap(), Collections.emptyMap(),
currentStateAssignment);
+ private static Set<AssignableReplica>
findToBeAssignedReplicasByComparingBaseline(
+ Map<String, Set<AssignableReplica>> replicaMap, Set<String>
activeInstances,
+ Map<String, ResourceAssignment> baselineAssignment,
+ Map<String, ResourceAssignment> bestPossibleAssignment,
+ Map<String, Set<AssignableReplica>> allocatedReplicas) {
+ Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+ // check each resource to identify the allocated replicas and
to-be-assigned replicas.
+ for (String resourceName : replicaMap.keySet()) {
+ // <partition, <state, instances set>>
+ Map<String, Map<String, Set<String>>> baselinePartitionStateMap =
+ getValidStateInstanceMap(baselineAssignment.get(resourceName),
activeInstances);
+ Map<String, Map<String, Set<String>>> bestPossiblePartitionStateMap =
+ getValidStateInstanceMap(bestPossibleAssignment.get(resourceName),
activeInstances);
+ // Iterate the replicas of the resource to find the ones that require
reallocating.
+ for (AssignableReplica replica : replicaMap.get(resourceName)) {
+ String partitionName = replica.getPartitionName();
+ String replicaState = replica.getReplicaState();
+ Set<String> baselineAllocations =
+ baselinePartitionStateMap.getOrDefault(partitionName,
Collections.emptyMap())
+ .getOrDefault(replicaState, Collections.emptySet());
+ Set<String> bestPossibleAllocations =
+ bestPossiblePartitionStateMap.getOrDefault(partitionName,
Collections.emptyMap())
+ .getOrDefault(replicaState, Collections.emptySet());
+
+ // Compare the best possible assignments with the baseline assignment
for the common part.
+ List<String> commonAllocations = new
ArrayList<>(bestPossibleAllocations);
+ commonAllocations.retainAll(baselineAllocations);
+ if (!commonAllocations.isEmpty()) {
+ // 1. If the partition is allocated at the same location in both
baseline and best possible
+ // assignment, there is no need to reassign it.
+ String allocatedInstance = commonAllocations.get(0);
+ allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new
HashSet<>()).add(replica);
+ // Remove the instance from the record to prevent the same location
being processed again.
+ baselineAllocations.remove(allocatedInstance);
+ bestPossibleAllocations.remove(allocatedInstance);
+ } else if (!baselineAllocations.isEmpty()) {
+ // 2. If the partition is allocated at an active instance in the
Baseline but the
+ // allocation does not exist in the best possible assignment, try to
rebalance it.
+ toBeAssignedReplicas.add(replica);
+ // Remove the instance from the baseline record to prevent the same
location being picked
+ // up again.
+ baselineAllocations.remove(baselineAllocations.iterator().next());
+ } else if (!bestPossibleAllocations.isEmpty()) {
+ // 3. If the partition is allocated at an active instance in the
best possible assignment
+ // only, there is no need to rebalance it.
+ String allocatedInstance = bestPossibleAllocations.iterator().next();
+ allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new
HashSet<>()).add(replica);
+ // Remove the instance from the record to prevent the same location
being processed again.
+ bestPossibleAllocations.remove(allocatedInstance);
+ } else {
+ // 4. If the partition is completely new, rebalance it.
Review comment:
Sounds good.Changed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]