dasahcc commented on a change in pull request #1098:
URL: https://github.com/apache/helix/pull/1098#discussion_r442409405



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
##########
@@ -352,60 +356,137 @@ public static int getStateCount(String state, 
StateModelDefinition stateModelDef
     }
 
     // (3) Assign normal states to instances.
-    // When we choose the top-state (e.g. MASTER) replica for a partition, we 
prefer to choose it from
-    // these replicas which are already in the secondary states (e.g, SLAVE) 
instead of in lower-state.
-    // This is because a replica in secondary state will take shorter time to 
transition to the top-state,
-    // which could minimize the impact to the application's availability.
-    // To achieve that, we sort the preferenceList based on CurrentState, by 
treating top-state and second-states with
-    // same priority and rely on the fact that Collections.sort() is stable.
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-    Set<String> assigned = new HashSet<>();
-    Set<String> liveAndEnabled = new HashSet<>(liveInstances);
-    liveAndEnabled.removeAll(disabledInstancesForPartition);
-
-    // Sort the instances based on replicas' state priority in the current 
state
-    List<String> sortedPreferenceList = new ArrayList<>(preferenceList);
-    sortedPreferenceList.sort(new StatePriorityComparator(currentStateMap, 
stateModelDef));
-
-    // Assign the state to the instances that appear in the preference list.
-    for (String state : statesPriorityList) {
+    assignStatesToInstances(preferenceList, stateModelDef, currentStateMap, 
liveInstances,
+        disabledInstancesForPartition, bestPossibleStateMap);
+
+    return bestPossibleStateMap;
+  }
+
+  /**
+   * Assign the states to the instances listed in the preference list 
according to inputs.
+   * Note that when we choose the top-state (e.g. MASTER) replica for a 
partition, we prefer to
+   * choose it from these replicas which are already in the secondary states 
(e.g, SLAVE) instead
+   * of in lower-state. This is because a replica in secondary state will take 
shorter time to
+   * transition to the top-state, which could minimize the impact to the 
application's availability.
+   * To achieve that, we sort the preferenceList based on CurrentState, by 
treating top-state and
+   * second-states with same priority and rely on the fact that 
Collections.sort() is stable.
+   */
+  private void assignStatesToInstances(final List<String> preferenceList,
+      final StateModelDefinition stateModelDef, final Map<String, String> 
currentStateMap,
+      final Set<String> liveInstances, final Set<String> 
disabledInstancesForPartition,
+      Map<String, String> bestPossibleStateMap) {
+    // Record the assigned instances to avoid double calculating or conflict 
assignment.
+    Set<String> assignedInstances = new HashSet<>();
+
+    Set<String> liveAndEnabled =
+        liveInstances.stream().filter(instance -> 
!disabledInstancesForPartition.contains(instance))
+            .collect(Collectors.toSet());
+
+    Queue<String> preferredActiveInstanceQueue = new 
LinkedList<>(preferenceList);
+    preferredActiveInstanceQueue.retainAll(liveAndEnabled);
+    int totalCandidateCount = preferredActiveInstanceQueue.size();
+
+    // Sort the preferred instances based on replicas' state priority in the 
current state.
+    // Note that if one instance exists in the current states but not in the 
preference list, then
+    // it won't show in the prioritized list.
+    List<String> currentStatePrioritizedList = new 
ArrayList<>(preferredActiveInstanceQueue);
+    currentStatePrioritizedList.sort(new 
StatePriorityComparator(currentStateMap, stateModelDef));
+    Iterator<String> currentStatePrioritizedInstanceIter = 
currentStatePrioritizedList.iterator();
+
+    // Assign the states to the instances that appear in the preference list.
+    for (String state : stateModelDef.getStatesPriorityList()) {
       int stateCount =
           getStateCount(state, stateModelDef, liveAndEnabled.size(), 
preferenceList.size());
-      for (String instance : preferenceList) {
+      while (!preferredActiveInstanceQueue.isEmpty()) {
         if (stateCount <= 0) {
           break; // continue assigning for the next state
         }
-        if (assigned.contains(instance) || !liveAndEnabled.contains(instance)) 
{
+        String peekInstance = preferredActiveInstanceQueue.peek();
+        if (assignedInstances.contains(peekInstance)) {
+          preferredActiveInstanceQueue.poll();
           continue; // continue checking for the next available instance
         }
-        String proposedInstance = instance;
-        // Additional check and alternate the assignment for reducing top 
state handoff.
-        if (state.equals(stateModelDef.getTopState()) && 
!stateModelDef.getSecondTopStates()
-            .contains(currentStateMap.getOrDefault(instance, 
stateModelDef.getInitialState()))) {
-          // If the desired state is the top state, but the instance cannot be 
transited to the
-          // top state in one hop, try to keep the top state on current host 
or a host with a closer
-          // state.
-          for (String currentStatePrioritizedInstance : sortedPreferenceList) {
-            if (!assigned.contains(currentStatePrioritizedInstance) && 
liveAndEnabled
-                .contains(currentStatePrioritizedInstance)) {
-              proposedInstance = currentStatePrioritizedInstance;
-              break;
-            }
-          }
-          // Note that if all the current top state instances are not 
assignable, then we fallback
-          // to the default logic that assigning the state according to 
preference list order.
+        String proposedInstance = adjustInstanceIfNecessary(state, 
peekInstance,
+            currentStateMap.getOrDefault(peekInstance, 
stateModelDef.getInitialState()),
+            stateModelDef, assignedInstances, totalCandidateCount - 
assignedInstances.size(),
+            stateCount, currentStatePrioritizedInstanceIter);
+
+        if (proposedInstance.equals(peekInstance)) {
+          // If the peeked instance is the final decision, then poll it from 
the queue.
+          preferredActiveInstanceQueue.poll();
         }
-        // Assign the desired state to the proposed instance
+        // else, if we found a different instance for the partition placement, 
then we need to
+        // check the same instance again or it will not be assigned with any 
partitions.
+
+        // Assign the desired state to the proposed instance if not on ERROR 
state.
         if 
(HelixDefinedState.ERROR.toString().equals(currentStateMap.get(proposedInstance)))
 {
           bestPossibleStateMap.put(proposedInstance, 
HelixDefinedState.ERROR.toString());
         } else {
           bestPossibleStateMap.put(proposedInstance, state);
           stateCount--;
         }
-        assigned.add(proposedInstance);
+        // Note that in either case, the proposed instance is considered to be 
assigned with a state
+        // by now.
+        if (!assignedInstances.add(proposedInstance)) {
+          throw new AssertionError(String
+              .format("The proposed instance %s has been already assigned 
before.",
+                  proposedInstance));

Review comment:
       This should not happen, right. You do the assignedInstances both for 
peek of the queue and after adjust instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to