jiajunwang commented on a change in pull request #639: Refine the WAGED
rebalancer to minimize the partial rebalance workload.
URL: https://github.com/apache/helix/pull/639#discussion_r355067704
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
##########
@@ -389,50 +376,89 @@ public void close() {
return finalIdealStateMap;
}
- // TODO make the Baseline calculation async if complicated algorithm is used
for the Baseline
- private void refreshBaseline(ResourceControllerDataProvider clusterData,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String,
Resource> resourceMap,
- final CurrentStateOutput currentStateOutput)
+ /**
+ * Global rebalance calculates for a new baseline assignment.
+ * The new baseline assignment will be persisted and leveraged by the
partial rebalance.
+ * @param clusterData
+ * @param resourceMap
+ * @param currentStateOutput
+ * @param algorithm
+ * @throws HelixRebalanceException
+ */
+ private void globalRebalance(ResourceControllerDataProvider clusterData,
+ Map<String, Resource> resourceMap, final CurrentStateOutput
currentStateOutput,
+ RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
+ _changeDetector.updateSnapshots(clusterData);
+ // Get all the changed items' information. Filter for the items that have
content changed.
+ final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
+ _changeDetector.getChangeTypes().stream()
+ .collect(Collectors.toMap(changeType -> changeType, changeType -> {
+ Set<String> itemKeys = new HashSet<>();
+ itemKeys.addAll(_changeDetector.getAdditionsByType(changeType));
+ itemKeys.addAll(_changeDetector.getChangesByType(changeType));
+ itemKeys.addAll(_changeDetector.getRemovalsByType(changeType));
+ return itemKeys;
+ })).entrySet().stream().filter(changeEntry ->
!changeEntry.getValue().isEmpty())
+ .collect(Collectors
+ .toMap(changeEntry -> changeEntry.getKey(), changeEntry ->
changeEntry.getValue()));
+
if (clusterChanges.keySet().stream()
.anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
- LOG.info("Start calculating the new baseline.");
- _globalBaselineCalcCounter.increment(1L);
- _globalBaselineCalcLatency.startMeasuringLatency();
-
- // For baseline calculation
+ // Build the cluster model for rebalance calculation.
+ // Note, for a Baseline calculation,
// 1. Ignore node status (disable/offline).
- // 2. Use the baseline as the previous best possible assignment since
there is no "baseline" for
- // the baseline.
- // Read the baseline from metadata store
+ // 2. Use the previous Baseline as the only parameter about the previous
assignment.
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
- Map<String, ResourceAssignment> newBaseline =
- calculateAssignment(clusterData, clusterChanges, resourceMap,
- clusterData.getAllInstances(), Collections.emptyMap(),
currentBaseline);
-
- // Write the new baseline to metadata store
- if (_assignmentMetadataStore != null) {
- try {
- _writeLatency.startMeasuringLatency();
- _assignmentMetadataStore.persistBaseline(newBaseline);
- _writeLatency.endMeasuringLatency();
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to persist the new
baseline assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
- } else {
- LOG.debug("Assignment Metadata Store is empty. Skip persist the
baseline assignment.");
+ ClusterModel clusterModel;
+ try {
+ clusterModel = ClusterModelProvider
+ .generateClusterModelForBaseline(clusterData, resourceMap,
+ clusterData.getAllInstances(), clusterChanges,
currentBaseline);
+ } catch (Exception ex) {
Review comment:
I agree that the general Exception is not the best design, but there is a
requirement to wrap the exceptions here. For example, we throw exceptions when
the IS node cannot be found, or weight config is negative. For those exception,
the provider throws with the detail exception, but the rebalancer needs to wrap
it for rebalance failure reporting.
One way to make it cleaner is explicitly listing all possible exceptions.
But since we are just wrapping here, I feel that would be too much verbose
code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]