desaikomal commented on code in PR #2189:
URL: https://github.com/apache/helix/pull/2189#discussion_r941424468
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -562,6 +590,57 @@ private Map<String, ResourceAssignment> partialRebalance(
}
_partialRebalanceLatency.endMeasuringLatency();
LOG.info("Finish calculating the new best possible assignment.");
+
+ if (isBestPossibleChanged) {
+ LOG.info("Schedule a new rebalance after the new best possible
calculation has finished.");
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L,
false);
+ }
+ }
+
+ private Map<String, ResourceAssignment> emergencyRebalance(
+ ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+ RebalanceAlgorithm algorithm)
+ throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBestPossibleAssignment =
+ getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+ ClusterModel clusterModel;
+ try {
+ clusterModel = ClusterModelProvider
+ .generateClusterModelForEmergencyRebalance(clusterData, resourceMap,
activeNodes,
+ currentBestPossibleAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for
emergency rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+
+ // Only calculate if there are illegal placements, meaning the cluster
model is non null;
+ // otherwise, start partial rebalance and return the current best possible.
+ if (clusterModel == null) {
+ // Perform partial rebalance for a new best possible assignment
+ partialRebalance(clusterData, resourceMap, activeNodes,
currentStateOutput, algorithm);
+ return currentBestPossibleAssignment;
+ }
+
+ _asyncPartialRebalanceResult.cancel(true);
Review Comment:
+1
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -358,9 +361,10 @@ protected Map<String, ResourceAssignment>
computeBestPossibleAssignment(
throws HelixRebalanceException {
// Perform global rebalance for a new baseline assignment
globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
- // Perform partial rebalance for a new best possible assignment
+ // Perform emergency rebalance for a new best possible assignment
Review Comment:
Q: looks like this is for case(1)?
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -562,6 +590,57 @@ private Map<String, ResourceAssignment> partialRebalance(
}
_partialRebalanceLatency.endMeasuringLatency();
LOG.info("Finish calculating the new best possible assignment.");
+
+ if (isBestPossibleChanged) {
+ LOG.info("Schedule a new rebalance after the new best possible
calculation has finished.");
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L,
false);
+ }
+ }
+
+ private Map<String, ResourceAssignment> emergencyRebalance(
+ ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+ RebalanceAlgorithm algorithm)
+ throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBestPossibleAssignment =
+ getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+ ClusterModel clusterModel;
+ try {
+ clusterModel = ClusterModelProvider
+ .generateClusterModelForEmergencyRebalance(clusterData, resourceMap,
activeNodes,
+ currentBestPossibleAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for
emergency rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+
+ // Only calculate if there are illegal placements, meaning the cluster
model is non null;
+ // otherwise, start partial rebalance and return the current best possible.
+ if (clusterModel == null) {
+ // Perform partial rebalance for a new best possible assignment
+ partialRebalance(clusterData, resourceMap, activeNodes,
currentStateOutput, algorithm);
+ return currentBestPossibleAssignment;
+ }
+
+ _asyncPartialRebalanceResult.cancel(true);
Review Comment:
do we cancel the earlier partialRebalance because the data has changed?
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java:
##########
@@ -391,6 +406,34 @@ private static Set<AssignableReplica>
findToBeAssignedReplicasByClusterChanges(
return toBeAssignedReplicas;
}
+ private static Set<AssignableReplica>
findToBeAssignedReplicasIllegalPlacements(
+ Map<String, Set<AssignableReplica>> replicaMap, Set<String>
activeInstances,
+ Map<String, ResourceAssignment> currentAssignment,
+ Map<String, Set<AssignableReplica>> allocatedReplicas) {
+ // For any replica tht does not exist in currentAssignment (new resources)
or does not exist in
+ // active instances (down instances), return them.
+ Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+ for (String resourceName : replicaMap.keySet()) {
+ Map<String, Map<String, Set<String>>> currentPartitionStateMap =
+ getValidStateInstanceMap(currentAssignment.get(resourceName),
activeInstances);
+ for (AssignableReplica replica : replicaMap.get(resourceName)) {
+ String partitionName = replica.getPartitionName();
+ String replicaState = replica.getReplicaState();
+ Set<String> currentAllocations =
+ currentPartitionStateMap.getOrDefault(partitionName,
Collections.emptyMap())
+ .getOrDefault(replicaState, Collections.emptySet());
+ List<String> currentAllocationsList = new
ArrayList<>(currentAllocations);
Review Comment:
i am sorry, but why are we creating this new List<>?
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java:
##########
@@ -391,6 +406,34 @@ private static Set<AssignableReplica>
findToBeAssignedReplicasByClusterChanges(
return toBeAssignedReplicas;
}
+ private static Set<AssignableReplica>
findToBeAssignedReplicasIllegalPlacements(
Review Comment:
nit: useful to have some high level comments for method.
if i understand, it is trying to find 'currentIllegalPlacements'
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -534,8 +535,34 @@ private Map<String, ResourceAssignment> partialRebalance(
throw new HelixRebalanceException("Failed to generate cluster model for
partial rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
- Map<String, ResourceAssignment> newAssignment =
calculateAssignment(clusterModel, algorithm);
+ _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() ->
{
Review Comment:
would be useful to have log message about calculating Emergency pipeline. (i
know method already has log statement, but ability to trace call-path will be
useful)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]