dasahcc commented on a change in pull request #1098:
URL: https://github.com/apache/helix/pull/1098#discussion_r442409405
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
##########
@@ -352,60 +356,137 @@ public static int getStateCount(String state,
StateModelDefinition stateModelDef
}
// (3) Assign normal states to instances.
- // When we choose the top-state (e.g. MASTER) replica for a partition, we
prefer to choose it from
- // these replicas which are already in the secondary states (e.g, SLAVE)
instead of in lower-state.
- // This is because a replica in secondary state will take shorter time to
transition to the top-state,
- // which could minimize the impact to the application's availability.
- // To achieve that, we sort the preferenceList based on CurrentState, by
treating top-state and second-states with
- // same priority and rely on the fact that Collections.sort() is stable.
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
- Set<String> assigned = new HashSet<>();
- Set<String> liveAndEnabled = new HashSet<>(liveInstances);
- liveAndEnabled.removeAll(disabledInstancesForPartition);
-
- // Sort the instances based on replicas' state priority in the current
state
- List<String> sortedPreferenceList = new ArrayList<>(preferenceList);
- sortedPreferenceList.sort(new StatePriorityComparator(currentStateMap,
stateModelDef));
-
- // Assign the state to the instances that appear in the preference list.
- for (String state : statesPriorityList) {
+ assignStatesToInstances(preferenceList, stateModelDef, currentStateMap,
liveInstances,
+ disabledInstancesForPartition, bestPossibleStateMap);
+
+ return bestPossibleStateMap;
+ }
+
+ /**
+ * Assign the states to the instances listed in the preference list
according to inputs.
+ * Note that when we choose the top-state (e.g. MASTER) replica for a
partition, we prefer to
+ * choose it from these replicas which are already in the secondary states
(e.g, SLAVE) instead
+ * of in lower-state. This is because a replica in secondary state will take
shorter time to
+ * transition to the top-state, which could minimize the impact to the
application's availability.
+ * To achieve that, we sort the preferenceList based on CurrentState, by
treating top-state and
+ * second-states with same priority and rely on the fact that
Collections.sort() is stable.
+ */
+ private void assignStatesToInstances(final List<String> preferenceList,
+ final StateModelDefinition stateModelDef, final Map<String, String>
currentStateMap,
+ final Set<String> liveInstances, final Set<String>
disabledInstancesForPartition,
+ Map<String, String> bestPossibleStateMap) {
+ // Record the assigned instances to avoid double calculating or conflict
assignment.
+ Set<String> assignedInstances = new HashSet<>();
+
+ Set<String> liveAndEnabled =
+ liveInstances.stream().filter(instance ->
!disabledInstancesForPartition.contains(instance))
+ .collect(Collectors.toSet());
+
+ Queue<String> preferredActiveInstanceQueue = new
LinkedList<>(preferenceList);
+ preferredActiveInstanceQueue.retainAll(liveAndEnabled);
+ int totalCandidateCount = preferredActiveInstanceQueue.size();
+
+ // Sort the preferred instances based on replicas' state priority in the
current state.
+ // Note that if one instance exists in the current states but not in the
preference list, then
+ // it won't show in the prioritized list.
+ List<String> currentStatePrioritizedList = new
ArrayList<>(preferredActiveInstanceQueue);
+ currentStatePrioritizedList.sort(new
StatePriorityComparator(currentStateMap, stateModelDef));
+ Iterator<String> currentStatePrioritizedInstanceIter =
currentStatePrioritizedList.iterator();
+
+ // Assign the states to the instances that appear in the preference list.
+ for (String state : stateModelDef.getStatesPriorityList()) {
int stateCount =
getStateCount(state, stateModelDef, liveAndEnabled.size(),
preferenceList.size());
- for (String instance : preferenceList) {
+ while (!preferredActiveInstanceQueue.isEmpty()) {
if (stateCount <= 0) {
break; // continue assigning for the next state
}
- if (assigned.contains(instance) || !liveAndEnabled.contains(instance))
{
+ String peekInstance = preferredActiveInstanceQueue.peek();
+ if (assignedInstances.contains(peekInstance)) {
+ preferredActiveInstanceQueue.poll();
continue; // continue checking for the next available instance
}
- String proposedInstance = instance;
- // Additional check and alternate the assignment for reducing top
state handoff.
- if (state.equals(stateModelDef.getTopState()) &&
!stateModelDef.getSecondTopStates()
- .contains(currentStateMap.getOrDefault(instance,
stateModelDef.getInitialState()))) {
- // If the desired state is the top state, but the instance cannot be
transited to the
- // top state in one hop, try to keep the top state on current host
or a host with a closer
- // state.
- for (String currentStatePrioritizedInstance : sortedPreferenceList) {
- if (!assigned.contains(currentStatePrioritizedInstance) &&
liveAndEnabled
- .contains(currentStatePrioritizedInstance)) {
- proposedInstance = currentStatePrioritizedInstance;
- break;
- }
- }
- // Note that if all the current top state instances are not
assignable, then we fallback
- // to the default logic that assigning the state according to
preference list order.
+ String proposedInstance = adjustInstanceIfNecessary(state,
peekInstance,
+ currentStateMap.getOrDefault(peekInstance,
stateModelDef.getInitialState()),
+ stateModelDef, assignedInstances, totalCandidateCount -
assignedInstances.size(),
+ stateCount, currentStatePrioritizedInstanceIter);
+
+ if (proposedInstance.equals(peekInstance)) {
+ // If the peeked instance is the final decision, then poll it from
the queue.
+ preferredActiveInstanceQueue.poll();
}
- // Assign the desired state to the proposed instance
+ // else, if we found a different instance for the partition placement,
then we need to
+ // check the same instance again or it will not be assigned with any
partitions.
+
+ // Assign the desired state to the proposed instance if not on ERROR
state.
if
(HelixDefinedState.ERROR.toString().equals(currentStateMap.get(proposedInstance)))
{
bestPossibleStateMap.put(proposedInstance,
HelixDefinedState.ERROR.toString());
} else {
bestPossibleStateMap.put(proposedInstance, state);
stateCount--;
}
- assigned.add(proposedInstance);
+ // Note that in either case, the proposed instance is considered to be
assigned with a state
+ // by now.
+ if (!assignedInstances.add(proposedInstance)) {
+ throw new AssertionError(String
+ .format("The proposed instance %s has been already assigned
before.",
+ proposedInstance));
Review comment:
This should not happen, right. You do the assignedInstances both for
peek of the queue and after adjust instance.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]