junkaixue commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173173685


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -385,6 +375,63 @@ protected List<HelixRebalanceException.Type> 
failureTypesToPropagate() {
     return FAILURE_TYPES_TO_PROPAGATE;
   }
 
+  /**
+   * Some partition may fail to meet minActiveReplica due to delayed 
rebalance, because some instances are offline yet
+   * active. In this case, additional replicas have to be brought up -- until 
either the instance gets back, or timeout,
+   * at which we have a more permanent resolution.
+   * The term "overwrite" is inherited from historical approach, however, it's 
no longer technically an overwrite.
+   * It's a formal rebalance process that goes through the algorithm and all 
constraints.
+   * @param clusterData Cluster data cache
+   * @param resourceMap The map of resource to calculate
+   * @param activeNodes All active nodes (live nodes plus offline-yet-active 
nodes) while considering cluster's
+   *                    delayed rebalance config
+   * @param currentResourceAssignment The current resource assignment or the 
best possible assignment computed from last
+   *                           emergency rebalance.
+   * @param algorithm The rebalance algorithm
+   * @return The resource assignment with delayed rebalance minActiveReplica
+   */
+  private Map<String, ResourceAssignment> 
handleDelayedRebalanceMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap,
+      Set<String> activeNodes,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+    // the "real" live nodes at the time
+    final Set<String> enabledLiveInstances = 
clusterData.getEnabledLiveInstances();
+    if (activeNodes.equals(enabledLiveInstances) || 
!requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
+      // no need for additional process, return the current resource assignment
+      return currentResourceAssignment;
+    }
+    _rebalanceOverwriteCounter.increment(1L);
+    _rebalanceOverwriteLatency.startMeasuringLatency();
+    LOG.info("Start delayed rebalance overwrites in emergency rebalance.");
+    try {
+      // use the "real" live and enabled instances for calculation
+      ClusterModel clusterModel = 
ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(
+          clusterData, resourceMap, enabledLiveInstances, 
currentResourceAssignment);
+      Map<String, ResourceAssignment> assignment = 
WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);

Review Comment:
   NIT: no further usage of clusterModel. You can merge these two lines.



##########
helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java:
##########
@@ -177,12 +177,8 @@ public static void scheduleOnDemandPipeline(String 
clusterName, long delay) {
 
   public static void scheduleOnDemandPipeline(String clusterName, long delay,
       boolean shouldRefreshCache) {
-    if (clusterName == null) {
-      LOG.error("Failed to issue a pipeline run. ClusterName is null.");
-      return;
-    }
-    if (delay < 0L) {
-      LOG.error("Failed to issue a pipeline run. Delay is invalid.");
+    if (clusterName == null || delay < 0L) {
+      LOG.warn("ClusterName is null or delay is invalid, skip the pipeline 
issuing.");

Review Comment:
   It's OK to merge them. But print them out. Otherwise, when debug, it will be 
hard for us to understand which one cause it.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -311,30 +311,70 @@ public static Set<AssignableReplica> 
findToBeAssignedReplicasForMinActiveReplica
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
     for (String resourceName : resources) {
-      // <partition, <state, instances set>>
-      Map<String, Map<String, Set<String>>> stateInstanceMap =
-          
ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
       ResourceAssignment resourceAssignment = 
currentAssignment.get(resourceName);
-      String modelDef = 
clusterData.getIdealState(resourceName).getStateModelDefRef();
+      IdealState idealState = clusterData.getIdealState(resourceName);
+      String modelDef = idealState.getStateModelDefRef();
       Map<String, Integer> statePriorityMap = 
clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      ResourceConfig mergedResourceConfig =
+          
ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
 idealState);

Review Comment:
   Is this because of user fixed assignment?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -335,6 +335,42 @@ public static Set<AssignableReplica> 
findToBeAssignedReplicasForMinActiveReplica
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is 
computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances 
(within delayed window).
+   * The merge process is independent for each resource; for each 
resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in 
newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated 
during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> 
newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the 
instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // 
<instance, state>
+            // the existing partitions on the enabledLiveInstances are 
pre-allocated, only process for the rest
+            .filter(e -> !enabledLiveInstances.contains(e.getKey()) ||
+                !newAssignment.containsKey(resourceName) ||
+                
!newAssignment.get(resourceName).getReplicaMap(partition).containsKey(e.getKey()))

Review Comment:
   I know the computed result is merging two of them. I would like to for the 
input of newAssignment, was just containing single replica need to be 
reassigned.
   
   The reason is that this logic is very complicate. I am thinking of what is 
the best way to merge them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to