jiajunwang commented on a change in pull request #639: Refine the WAGED 
rebalancer to minimize the partial rebalance workload.
URL: https://github.com/apache/helix/pull/639#discussion_r352874065
 
 

 ##########
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 ##########
 @@ -389,50 +376,81 @@ public void close() {
     return finalIdealStateMap;
   }
 
-  // TODO make the Baseline calculation async if complicated algorithm is used 
for the Baseline
-  private void refreshBaseline(ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, 
Resource> resourceMap,
-      final CurrentStateOutput currentStateOutput)
+  private void globalRebalance(ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap, final CurrentStateOutput 
currentStateOutput,
+      RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    _changeDetector.updateSnapshots(clusterData);
+    // Get all the changed items' information. Filter for the items that have 
content changed.
+    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
+        _changeDetector.getChangeTypes().stream()
+            .collect(Collectors.toMap(changeType -> changeType, changeType -> {
+              Set<String> itemKeys = new HashSet<>();
+              itemKeys.addAll(_changeDetector.getAdditionsByType(changeType));
+              itemKeys.addAll(_changeDetector.getChangesByType(changeType));
+              itemKeys.addAll(_changeDetector.getRemovalsByType(changeType));
+              return itemKeys;
+            })).entrySet().stream().filter(changeEntry -> 
!changeEntry.getValue().isEmpty())
+            .collect(Collectors
+                .toMap(changeEntry -> changeEntry.getKey(), changeEntry -> 
changeEntry.getValue()));
+
     if (clusterChanges.keySet().stream()
         .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
-      LOG.info("Start calculating the new baseline.");
-      _globalBaselineCalcCounter.increment(1L);
-      _globalBaselineCalcLatency.startMeasuringLatency();
-
-      // For baseline calculation
+      // Build the cluster model for rebalance calculation.
+      // Note, for a baseline calculation,
       // 1. Ignore node status (disable/offline).
-      // 2. Use the baseline as the previous best possible assignment since 
there is no "baseline" for
-      // the baseline.
-      // Read the baseline from metadata store
+      // 2. Use the baseline as the previous best possible assignment since 
there is no "baseline"
+      // for the baseline.
       Map<String, ResourceAssignment> currentBaseline =
           getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet());
-      Map<String, ResourceAssignment> newBaseline =
-          calculateAssignment(clusterData, clusterChanges, resourceMap,
-              clusterData.getAllInstances(), Collections.emptyMap(), 
currentBaseline);
-
-      // Write the new baseline to metadata store
-      if (_assignmentMetadataStore != null) {
-        try {
-          _writeLatency.startMeasuringLatency();
-          _assignmentMetadataStore.persistBaseline(newBaseline);
-          _writeLatency.endMeasuringLatency();
-        } catch (Exception ex) {
-          throw new HelixRebalanceException("Failed to persist the new 
baseline assignment.",
-              HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-        }
-      } else {
-        LOG.debug("Assignment Metadata Store is empty. Skip persist the 
baseline assignment.");
+      ClusterModel clusterModel;
+      try {
+        clusterModel = ClusterModelProvider
+            .generateClusterModelForBaseline(clusterData, resourceMap,
+                clusterData.getAllInstances(), clusterChanges, 
Collections.emptyMap(),
+                currentBaseline);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to generate cluster model 
for global rebalance.",
+            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+      }
+
+      refreshBaseline(clusterModel, algorithm);
+    }
+  }
+
+  /**
+   * Calculate and update the Baseline assignment
+   * @param clusterModel
+   * @throws HelixRebalanceException
+   */
+  private void refreshBaseline(ClusterModel clusterModel, RebalanceAlgorithm 
algorithm)
+      throws HelixRebalanceException {
+    LOG.info("Start calculating the new baseline.");
+    _globalBaselineCalcCounter.increment(1L);
+    _globalBaselineCalcLatency.startMeasuringLatency();
+
+    Map<String, ResourceAssignment> newBaseline = 
calculateAssignment(clusterModel, algorithm);
+    // Write the new baseline to metadata store
+    if (_assignmentMetadataStore != null) {
+      try {
+        _writeLatency.startMeasuringLatency();
+        _assignmentMetadataStore.persistBaseline(newBaseline);
+        _writeLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new baseline 
assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
       }
-      _globalBaselineCalcLatency.endMeasuringLatency();
-      LOG.info("Finish calculating the new baseline.");
+    } else {
+      LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline 
assignment.");
 
 Review comment:
   If you still remember, this is from our old change that the store might be 
null when it is running in the test.
   Will fix the log message

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to