qqu0127 commented on code in PR #2189:
URL: https://github.com/apache/helix/pull/2189#discussion_r1028323985


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java:
##########
@@ -92,60 +93,79 @@ public synchronized Map<String, ResourceAssignment> 
getBestPossibleAssignment()
   }
 
   /**
-   * @return true if a new baseline was persisted.
+   * @param newAssignment
+   * @param path the path of the assignment record
+   * @param key  the key of the assignment in the record
    * @throws HelixException if the method failed to persist the baseline.
    */
-  public synchronized boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
-    return persistAssignment(globalBaseline, getBaseline(), _baselinePath, 
BASELINE_KEY);
+  private void persistAssignmentToMetadataStore(Map<String, 
ResourceAssignment> newAssignment, String path, String key)
+      throws HelixException {
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException(String.format("Failed to persist %s assignment 
to path %s", key, path), e);
+    }
   }
 
   /**
-   * @return true if a new best possible assignment was persisted.
-   * @throws HelixException if the method failed to persist the baseline.
+   * Persist a new baseline assignment to metadata store first, then to memory
+   * @param globalBaseline
    */
-  public synchronized boolean persistBestPossibleAssignment(
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
-    return persistAssignment(bestPossibleAssignment, 
getBestPossibleAssignment(), _bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  public synchronized void persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(globalBaseline, _baselinePath, 
BASELINE_KEY);
+    // write to memory
+    getBaseline().clear();
+    getBaseline().putAll(globalBaseline);
   }
 
-  public synchronized void clearAssignmentMetadata() {
-    persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, 
BASELINE_KEY);
-    persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), 
_bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  /**
+   * Persist a new best possible assignment to metadata store first, then to 
memory.
+   * Increment best possible version by 1 - this is a high priority in-memory 
write.
+   * @param bestPossibleAssignment
+   */
+  public synchronized void persistBestPossibleAssignment(Map<String, 
ResourceAssignment> bestPossibleAssignment) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(bestPossibleAssignment, 
_bestPossiblePath, BEST_POSSIBLE_KEY);
+    // write to memory
+    getBestPossibleAssignment().clear();
+    getBestPossibleAssignment().putAll(bestPossibleAssignment);
+    _bestPossibleVersion++;
   }
 
   /**
-   * @param newAssignment
-   * @param cachedAssignment
-   * @param path the path of the assignment record
-   * @param key  the key of the assignment in the record
-   * @return true if a new assignment was persisted.
+   * Attempts to persist Best Possible Assignment in memory from an 
asynchronous thread.
+   * Persist only happens when the provided version is not stale - this is a 
low priority in-memory write.
+   * @param bestPossibleAssignment - new assignment to be persisted
+   * @param newVersion - attempted new version to write. This version is 
obtained earlier from getBestPossibleVersion()
+   * @return true if the attempt succeeded, false otherwise.
    */
-  // TODO: Enhance the return value so it is more intuitive to understand when 
the persist fails and
-  // TODO: when it is skipped.
-  private boolean persistAssignment(Map<String, ResourceAssignment> 
newAssignment,
-      Map<String, ResourceAssignment> cachedAssignment, String path,
-      String key) {
-    // TODO: Make the write async?
-    // If the assignment hasn't changed, skip writing to metadata store
-    if (compareAssignments(cachedAssignment, newAssignment)) {
-      return false;
-    }
-    // Persist to ZK
-    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
-    try {
-      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
-    } catch (IOException e) {
-      // TODO: Improve failure handling
-      throw new HelixException(
-          String.format("Failed to persist %s assignment to path %s", key, 
path), e);
+  public synchronized boolean asyncPersistBestPossibleAssignmentInMemory(

Review Comment:
   "persist in memory" is a little weird. maybe it's better with 
"refreshBestPossibleAssignmentCache"?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -501,21 +512,64 @@ private void calculateAndUpdateBaseline(ClusterModel 
clusterModel, RebalanceAlgo
     _baselineCalcLatency.endMeasuringLatency();
     LOG.info("Global baseline calculation completed and has been persisted 
into metadata store.");
 
-    if (isBaselineChanged && shouldSchedulePartialRebalance) {
+    if (isBaselineChanged && shouldTriggerMainPipeline) {
       LOG.info("Schedule a new rebalance after the new baseline calculation 
has finished.");
-      RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, 
false);
     }
   }
 
-  private Map<String, ResourceAssignment> partialRebalance(
+  private void partialRebalance(

Review Comment:
   General comment on the overall structure, open to discussion -- 
   Can we make partial rebalance a standalone class? This `WagedRebalancer` 
class is getting more and more complicated, and "partial rebalance" is more 
like a separate module now, it has its own computation logic and execution 
context.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java:
##########
@@ -92,60 +93,79 @@ public synchronized Map<String, ResourceAssignment> 
getBestPossibleAssignment()
   }
 
   /**
-   * @return true if a new baseline was persisted.
+   * @param newAssignment
+   * @param path the path of the assignment record
+   * @param key  the key of the assignment in the record
    * @throws HelixException if the method failed to persist the baseline.
    */
-  public synchronized boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
-    return persistAssignment(globalBaseline, getBaseline(), _baselinePath, 
BASELINE_KEY);
+  private void persistAssignmentToMetadataStore(Map<String, 
ResourceAssignment> newAssignment, String path, String key)
+      throws HelixException {
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException(String.format("Failed to persist %s assignment 
to path %s", key, path), e);

Review Comment:
   Just curious what you have in mind for better failure handling? 



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -501,21 +512,64 @@ private void calculateAndUpdateBaseline(ClusterModel 
clusterModel, RebalanceAlgo
     _baselineCalcLatency.endMeasuringLatency();
     LOG.info("Global baseline calculation completed and has been persisted 
into metadata store.");
 
-    if (isBaselineChanged && shouldSchedulePartialRebalance) {
+    if (isBaselineChanged && shouldTriggerMainPipeline) {
       LOG.info("Schedule a new rebalance after the new baseline calculation 
has finished.");
-      RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, 
false);
     }
   }
 
-  private Map<String, ResourceAssignment> partialRebalance(
+  private void partialRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
       RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    // If partial rebalance is async and the previous result is not completed 
yet,
+    // do not start another partial rebalance.
+    if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+        && !_asyncPartialRebalanceResult.isDone()) {
+      return;

Review Comment:
   For my own understanding, does this mean we have more frequent emergency 
rebalance? 
   
   also nit: Let's log something here.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java:
##########
@@ -206,10 +226,18 @@ private Map<String, ResourceAssignment> 
splitAssignments(HelixProperty property)
    * @param newAssignment
    * @return true if they are the same. False otherwise or oldAssignment is 
null
    */
-  protected boolean compareAssignments(Map<String, ResourceAssignment> 
oldAssignment,
+  private boolean compareAssignments(Map<String, ResourceAssignment> 
oldAssignment,
       Map<String, ResourceAssignment> newAssignment) {
     // If oldAssignment is null, that means that we haven't read from/written 
to
     // the metadata store yet. In that case, we return false so that we write 
to metadata store.
     return oldAssignment != null && oldAssignment.equals(newAssignment);
   }
+
+  protected boolean compareBaseline(Map<String, ResourceAssignment> 
newBaseline) {

Review Comment:
   nit: with "compare" it's unclear about the return value, let's add some 
comments here or explicitly rename to isBaselineChanged or something.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java:
##########
@@ -51,7 +51,30 @@ private enum RebalanceScopeType {
     PARTIAL,
     // Set the rebalance scope to cover all replicas that need relocation 
based on the cluster
     // changes.
-    GLOBAL_BASELINE
+    GLOBAL_BASELINE,
+    EMERGENCY

Review Comment:
   nit: some comments will be great.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java:
##########
@@ -391,6 +418,46 @@ private static Set<AssignableReplica> 
findToBeAssignedReplicasByClusterChanges(
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Find replicas that were assigned to non-active nodes in the current 
assignment.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by 
resource name.
+   * @param activeInstances        All the instances that are live and enabled 
according to the delay rebalance configuration.
+   * @param currentAssignment      The current assignment that was generated 
in the previous rebalance.
+   * @param allocatedReplicas      A map of <Instance -> replicas> to return 
the allocated replicas grouped by the target instance name.
+   * @return The replicas that need to be reassigned.
+   */
+  private static Set<AssignableReplica> 
findToBeAssignedReplicasOnDownInstances(
+      Map<String, Set<AssignableReplica>> replicaMap, Set<String> 
activeInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    // For any replica that are assigned to non-active instances (down 
instances), add them.
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    for (String resourceName : replicaMap.keySet()) {
+      Map<String, Map<String, Set<String>>> stateInstanceMap = 
getStateInstanceMap(currentAssignment.get(resourceName));
+
+      for (AssignableReplica replica : replicaMap.get(resourceName)) {
+        String partitionName = replica.getPartitionName();
+        String replicaState = replica.getReplicaState();
+        Set<String> currentAllocations =
+            stateInstanceMap.getOrDefault(partitionName, 
Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+        if (!currentAllocations.isEmpty()) {
+          String allocatedInstance = currentAllocations.iterator().next();
+          if (activeInstances.contains(allocatedInstance)) {
+            allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new 
HashSet<>()).add(replica);

Review Comment:
   for my own understanding, do we only check the first allocatedInstance? Any 
chance there is more than one element in the set?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java:
##########
@@ -92,60 +93,79 @@ public synchronized Map<String, ResourceAssignment> 
getBestPossibleAssignment()
   }
 
   /**
-   * @return true if a new baseline was persisted.
+   * @param newAssignment
+   * @param path the path of the assignment record
+   * @param key  the key of the assignment in the record
    * @throws HelixException if the method failed to persist the baseline.
    */
-  public synchronized boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
-    return persistAssignment(globalBaseline, getBaseline(), _baselinePath, 
BASELINE_KEY);
+  private void persistAssignmentToMetadataStore(Map<String, 
ResourceAssignment> newAssignment, String path, String key)
+      throws HelixException {
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException(String.format("Failed to persist %s assignment 
to path %s", key, path), e);
+    }
   }
 
   /**
-   * @return true if a new best possible assignment was persisted.
-   * @throws HelixException if the method failed to persist the baseline.
+   * Persist a new baseline assignment to metadata store first, then to memory
+   * @param globalBaseline
    */
-  public synchronized boolean persistBestPossibleAssignment(
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
-    return persistAssignment(bestPossibleAssignment, 
getBestPossibleAssignment(), _bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  public synchronized void persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(globalBaseline, _baselinePath, 
BASELINE_KEY);
+    // write to memory
+    getBaseline().clear();
+    getBaseline().putAll(globalBaseline);
   }
 
-  public synchronized void clearAssignmentMetadata() {
-    persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, 
BASELINE_KEY);
-    persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), 
_bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  /**
+   * Persist a new best possible assignment to metadata store first, then to 
memory.
+   * Increment best possible version by 1 - this is a high priority in-memory 
write.
+   * @param bestPossibleAssignment
+   */
+  public synchronized void persistBestPossibleAssignment(Map<String, 
ResourceAssignment> bestPossibleAssignment) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(bestPossibleAssignment, 
_bestPossiblePath, BEST_POSSIBLE_KEY);
+    // write to memory
+    getBestPossibleAssignment().clear();
+    getBestPossibleAssignment().putAll(bestPossibleAssignment);
+    _bestPossibleVersion++;
   }
 
   /**
-   * @param newAssignment
-   * @param cachedAssignment
-   * @param path the path of the assignment record
-   * @param key  the key of the assignment in the record
-   * @return true if a new assignment was persisted.
+   * Attempts to persist Best Possible Assignment in memory from an 
asynchronous thread.
+   * Persist only happens when the provided version is not stale - this is a 
low priority in-memory write.
+   * @param bestPossibleAssignment - new assignment to be persisted
+   * @param newVersion - attempted new version to write. This version is 
obtained earlier from getBestPossibleVersion()
+   * @return true if the attempt succeeded, false otherwise.
    */
-  // TODO: Enhance the return value so it is more intuitive to understand when 
the persist fails and
-  // TODO: when it is skipped.
-  private boolean persistAssignment(Map<String, ResourceAssignment> 
newAssignment,
-      Map<String, ResourceAssignment> cachedAssignment, String path,
-      String key) {
-    // TODO: Make the write async?
-    // If the assignment hasn't changed, skip writing to metadata store
-    if (compareAssignments(cachedAssignment, newAssignment)) {
-      return false;
-    }
-    // Persist to ZK
-    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
-    try {
-      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
-    } catch (IOException e) {
-      // TODO: Improve failure handling
-      throw new HelixException(
-          String.format("Failed to persist %s assignment to path %s", key, 
path), e);
+  public synchronized boolean asyncPersistBestPossibleAssignmentInMemory(

Review Comment:
   Is it applicable to use finer scoped lock? same for other methods as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to