qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173896583
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -311,30 +311,70 @@ public static Set<AssignableReplica>
findToBeAssignedReplicasForMinActiveReplica
Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
for (String resourceName : resources) {
- // <partition, <state, instances set>>
- Map<String, Map<String, Set<String>>> stateInstanceMap =
-
ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
ResourceAssignment resourceAssignment =
currentAssignment.get(resourceName);
- String modelDef =
clusterData.getIdealState(resourceName).getStateModelDefRef();
+ IdealState idealState = clusterData.getIdealState(resourceName);
+ String modelDef = idealState.getStateModelDefRef();
Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+ ResourceConfig mergedResourceConfig =
+
ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
idealState);
+
// keep all current assignment and add to allocated replicas
resourceAssignment.getMappedPartitions().forEach(partition ->
resourceAssignment.getReplicaMap(partition).forEach((instance,
state) ->
allocatedReplicas.computeIfAbsent(instance, key -> new
HashSet<>())
- .add(new AssignableReplica(clusterData.getClusterConfig(),
clusterData.getResourceConfig(resourceName),
+ .add(new AssignableReplica(clusterData.getClusterConfig(),
mergedResourceConfig,
partition.getPartitionName(), state,
statePriorityMap.get(state)))));
// only proceed for resource requiring delayed rebalance overwrites
List<String> partitions =
partitionsMissingMinActiveReplicas.getOrDefault(resourceName,
Collections.emptyList());
if (partitions.isEmpty()) {
continue;
}
+ // <partition, <state, instances set>>
+ Map<String, Map<String, Set<String>>> stateInstanceMap =
+ ClusterModelProvider.getStateInstanceMap(resourceAssignment);
toBeAssignedReplicas.addAll(
findAssignableReplicaForResource(clusterData, resourceName,
partitions, stateInstanceMap, liveEnabledInstances));
}
return toBeAssignedReplicas;
}
+ /**
+ * Merge entries from currentResourceAssignment to newAssignment.
+ * To handle minActiveReplica for delayed rebalance, new assignment is
computed based on enabled live instances, but
+ * could miss out current partition allocation still on offline instances
(within delayed window).
+ * The merge process is independent for each resource; for each
resource-partition, it adds the <instance, state> pair
+ * to newAssignment if it's not there yet; in other word, the entries in
newAssignment won't be override.
+ * @param newAssignment newAssignment to merge, this map is getting updated
during this method.
+ * @param currentResourceAssignment the current resource assignment
+ * @param enabledLiveInstances the set of enabled live instance
+ */
+ public static void mergeAssignments(Map<String, ResourceAssignment>
newAssignment,
+ Map<String, ResourceAssignment> currentResourceAssignment,
+ Set<String> enabledLiveInstances) {
+ // merge with current assignment for partitions assigned on rest of the
instances (not immediately live)
+ currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+ String resourceName = entry.getKey();
+ ResourceAssignment currentAssignment = entry.getValue();
+ for (Partition partition : currentAssignment.getMappedPartitions()) {
+ currentAssignment.getReplicaMap(partition).entrySet().stream() //
<instance, state>
+ // the existing partitions on the enabledLiveInstances are
pre-allocated, only process for the rest
Review Comment:
Yes, that's a good point to simplify the logic.
But one tradeoff is we may be looping through more elements and have more
memory footprint. The differences between newAssignment and currentAssignment
are the newly brought up replicas on live instances, and those on temporarily
downed instances. The whole assignment map might be large, while the diff can
be small.
Let me know what you think on this. Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]