kaisun2000 commented on a change in pull request #1532:
URL: https://github.com/apache/helix/pull/1532#discussion_r541482967
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -276,116 +191,173 @@ private void
validateMaxPartitionsPerInstance(ClusterEvent event,
}
/**
- * Compute intermediate partition states for a prioritized resource.
- * @param cache
- * @param clusterStatusMonitor
- * @param idealState
- * @param resource
+ * Go through each resource, and based on messageSelected and currentState,
compute
+ * messageOutput while maintaining throttling constraints (for example,
ensure that the number
+ * of possible pending state transitions does NOT go over the set threshold).
+ * @param event
+ * @param resourceMap
* @param currentStateOutput
- * @param bestPossiblePartitionStateMap
- * @param preferenceLists
- * @param throttleController
+ * @param selectedMessage
+ * @param retracedResourceStateMap out
* @return
*/
- private PartitionStateMap
computeIntermediatePartitionState(ResourceControllerDataProvider cache,
- ClusterStatusMonitor clusterStatusMonitor, IdealState idealState,
Resource resource,
- CurrentStateOutput currentStateOutput, PartitionStateMap
bestPossiblePartitionStateMap,
- Map<String, List<String>> preferenceLists,
- StateTransitionThrottleController throttleController) {
- String resourceName = resource.getResourceName();
- LogUtil.logDebug(logger, _eventId, String.format("Processing resource:
%s", resourceName));
+ private MessageOutput compute(ClusterEvent event, Map<String, Resource>
resourceMap,
+ CurrentStateOutput currentStateOutput, MessageOutput selectedMessage,
+ ResourcesStateMap retracedResourceStateMap,
+ List<Message> throttledRecoveryMsg, List<Message> throttledLoadMsg) {
+ MessageOutput output = new MessageOutput();
- // Throttling is applied only on FULL-AUTO mode
- if (!throttleController.isThrottleEnabled()
- ||
!IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())) {
- return bestPossiblePartitionStateMap;
- }
+ ResourceControllerDataProvider dataCache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
- String stateModelDefName = idealState.getStateModelDefRef();
- StateModelDefinition stateModelDef =
cache.getStateModelDef(stateModelDefName);
- PartitionStateMap intermediatePartitionStateMap = new
PartitionStateMap(resourceName);
+ StateTransitionThrottleController throttleController =
+ new StateTransitionThrottleController(resourceMap.keySet(),
dataCache.getClusterConfig(),
+ dataCache.getLiveInstances().keySet());
- Set<Partition> partitionsNeedRecovery = new HashSet<>();
- Set<Partition> partitionsNeedLoadBalance = new HashSet<>();
- Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- Map<String, String> bestPossibleMap =
- bestPossiblePartitionStateMap.getPartitionMap(partition);
- List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
+ // Resource level prioritization based on the numerical (sortable)
priority field.
+ // If the resource priority field is null/not set, the resource will be
treated as lowest
+ // priority.
+ List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
+ for (String resourceName : resourceMap.keySet()) {
+ prioritizedResourceList.add(new ResourcePriority(resourceName,
Integer.MIN_VALUE));
+ }
+ // If resourcePriorityField is null at the cluster level, all resources
will be considered equal
+ // in priority by keeping all priorities at MIN_VALUE
+ if (dataCache.getClusterConfig().getResourcePriorityField() != null) {
+ String priorityField =
dataCache.getClusterConfig().getResourcePriorityField();
+ for (ResourcePriority resourcePriority : prioritizedResourceList) {
+ String resourceName = resourcePriority.getResourceName();
- RebalanceType rebalanceType = getRebalanceType(cache, bestPossibleMap,
preferenceList,
- stateModelDef, currentStateMap, idealState,
partition.getPartitionName());
+ // Will take the priority from ResourceConfig first
+ // If ResourceConfig does not exist or does not have this field.
+ // Try to load it from the resource's IdealState. Otherwise, keep it
at the lowest priority
+ if (dataCache.getResourceConfig(resourceName) != null
+ &&
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField) !=
null) {
+ resourcePriority.setPriority(
+
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField));
+ } else if (dataCache.getIdealState(resourceName) != null
+ &&
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField)
+ != null) {
+ resourcePriority.setPriority(
+
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
+ }
+ }
+ prioritizedResourceList.sort(new ResourcePriorityComparator());
+ }
- // TODO: refine getRebalanceType to return more accurate rebalance
types. So the following
- // logic doesn't need to check for more details.
- boolean isRebalanceNeeded = false;
+ List<String> failedResources = new ArrayList<>();
- // Check whether partition has any ERROR state replicas
- if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) {
- partitionsWithErrorStateReplica.add(partition);
+ // Priority is applied in assignment computation because higher priority
by looping in order of
+ // decreasing priority
+ for (ResourcePriority resourcePriority : prioritizedResourceList) {
+ String resourceName = resourcePriority.getResourceName();
+
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ if (!bestPossibleStateOutput.containsResource(resourceName)) {
+ LogUtil.logInfo(logger, _eventId, String.format(
+ "Skip calculating per replica state for resource %s because the
best possible state is not available.",
+ resourceName));
+ continue;
}
- // Number of states required by StateModelDefinition are not satisfied,
need recovery
- if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
- // Check if recovery is needed for this partition
- if (!currentStateMap.equals(bestPossibleMap)) {
- partitionsNeedRecovery.add(partition);
- isRebalanceNeeded = true;
- }
- } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
- // Number of states required by StateModelDefinition are satisfied,
but to achieve
- // BestPossibleState, need load balance
- partitionsNeedLoadBalance.add(partition);
- isRebalanceNeeded = true;
+ Resource resource = resourceMap.get(resourceName);
+ IdealState idealState = dataCache.getIdealState(resourceName);
+ if (idealState == null) {
+ // If IdealState is null, use an empty one
+ LogUtil.logInfo(logger, _eventId, String
+ .format("IdealState for resource %s does not exist; resource may
not exist anymore",
+ resourceName));
+ idealState = new IdealState(resourceName);
+ idealState.setStateModelDefRef(resource.getStateModelDefRef());
}
- // Currently at BestPossibleState, no further action necessary
- if (!isRebalanceNeeded) {
- Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap);
- intermediatePartitionStateMap.setState(partition, intermediateMap);
+ Map<Partition, Map<String, String>> retracedPartitionsState = new
HashMap<>();
+ try {
+ Map<Partition, List<Message>> resourceMessages =
+ computePerReplicaPartitionState(idealState, currentStateOutput,
+ selectedMessage.getResourceMessages(resourceName),
resourceMap.get(resourceName),
+ bestPossibleStateOutput, dataCache,
+ throttleController, retracedPartitionsState,
throttledRecoveryMsg, throttledLoadMsg);
+ output.addResourceMessages(resourceName, resourceMessages);
+ retracedResourceStateMap.setState(resourceName,
retracedPartitionsState);
+ } catch (HelixException ex) {
+ LogUtil.logInfo(logger, _eventId,
+ "Failed to calculate per replica partition states for resource " +
resourceName, ex);
+ failedResources.add(resourceName);
}
}
- if (!partitionsNeedRecovery.isEmpty()) {
- LogUtil.logInfo(logger, _eventId, String.format(
- "Recovery balance needed for %s partitions: %s", resourceName,
partitionsNeedRecovery));
- }
- if (!partitionsNeedLoadBalance.isEmpty()) {
- LogUtil.logInfo(logger, _eventId, String.format("Load balance needed for
%s partitions: %s",
- resourceName, partitionsNeedLoadBalance));
- }
- if (!partitionsWithErrorStateReplica.isEmpty()) {
- LogUtil.logInfo(logger, _eventId,
- String.format("Partition currently has an ERROR replica in %s
partitions: %s",
- resourceName, partitionsWithErrorStateReplica));
+ return output;
+ }
+
+ /*
+ * Apply per-replica throttling logic and filter out excessive recovery and
load messages for a
+ * given resource.
+ * Reconstruct retrace partition states for a resource based on pending and
targeted messages
+ * Return messages for partitions of a resource.
+ * Out param retracedPartitionsCurrentState
Review comment:
Feel free to skip the IntermediateStateCalcStage, the new logic is in
PerReplicaThrottleStage.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -396,447 +368,386 @@ private PartitionStateMap
computeIntermediatePartitionState(ResourceControllerDa
// Perform regular load balance only if the number of partitions in
recovery and in error is
// less than the threshold. Otherwise, only allow downward-transition load
balance
boolean onlyDownwardLoadBalance = partitionCount > threshold;
-
- loadbalanceThrottledPartitions = loadRebalance(resource,
currentStateOutput,
- bestPossiblePartitionStateMap, throttleController,
intermediatePartitionStateMap,
- partitionsNeedLoadBalance,
currentStateOutput.getCurrentStateMap(resourceName),
- onlyDownwardLoadBalance, stateModelDef, cache);
-
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.updateRebalancerStats(resourceName,
partitionsNeedRecovery.size(),
- partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(),
- loadbalanceThrottledPartitions.size());
+ Set<Message> throttledLoadMessages = new HashSet<>();
+ LogUtil.logDebug(logger, _eventId,
+ String.format("applying load rebalance with resource %s,
onlyDownwardLoadBalance %s",
+ resourceName, onlyDownwardLoadBalance));
+ applyThrottling(resource, throttleController, currentStateMap,
bestPossibleMap, idealState,
+ cache, onlyDownwardLoadBalance, loadMessages, messagePartitionMap,
throttledLoadMessages,
+ StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+
+ LogUtil.logDebug(logger, _eventId,
+ String.format("resource %s, throttled recovery message: %s",
resourceName,throttledRecoveryMessages));
+ LogUtil.logDebug(logger, _eventId,
+ String.format("resource %s, throttled load messages: %s",
resourceName,throttledLoadMessages));
+
+ throttledRecoveryMsgOut.addAll(throttledRecoveryMessages);
+ throttledLoadMessageOut.addAll(throttledLoadMessages);
+
+ // Step 5: construct output
+ Map<Partition, List<Message>> out = new HashMap<>();
+ for (Partition partition : resource.getPartitions()) {
+ List<Message> partitionMessages =
selectedResourceMessages.get(partition);
+ if (partitionMessages == null) {
+ continue;
+ }
+ List<Message> finalPartitionMessages = new ArrayList<>();
+ for (Message message: partitionMessages) {
+ if (throttledRecoveryMessages.contains(message)) {
+ continue;
+ }
+ if (throttledLoadMessages.contains(message)) {
+ continue;
+ }
+ finalPartitionMessages.add(message);
+ }
+ out.put(partition, finalPartitionMessages);
}
- if (logger.isDebugEnabled()) {
- logPartitionMapState(resourceName, new
HashSet<>(resource.getPartitions()),
- partitionsNeedRecovery, recoveryThrottledPartitions,
partitionsNeedLoadBalance,
- loadbalanceThrottledPartitions, currentStateOutput,
bestPossiblePartitionStateMap,
- intermediatePartitionStateMap);
+ // Step 6: constructs all retraced partition state map for the resource;
+ for (Partition partition : resource.getPartitions()) {
+ List<Message> partitionMessages = out.get(partition);
+ if (partitionMessages == null) {
+ continue;
+ }
+ for (Message message : partitionMessages) {
+ if
(!Message.MessageType.STATE_TRANSITION.name().equals(message.getMsgType())) {
+ // todo: log?
+ // ignore cancellation message etc.
+ continue;
+ }
+ String toState = message.getToState();
+ // toIntance may not be in the retracedStateMap as so far it is
current state based.
+ // new instance in best possible not in currentstate would not be in
retracedStateMap yet.
+ String toInstance = message.getTgtName();
+ Map<String, String> retracedStateMap =
retracedPartitionsStateMap.get(partition);
+ retracedStateMap.put(toInstance, toState);
+ }
}
-
- LogUtil.logDebug(logger, _eventId, String.format("End processing resource:
%s", resourceName));
- return intermediatePartitionStateMap;
+ return out;
}
- /**
- * Check for a partition, whether all transitions for its replicas are
downward transitions. Note
- * that this function does NOT check for ERROR states.
- * @param currentStateMap
- * @param bestPossibleMap
- * @param stateModelDef
- * @return true if there are; false otherwise
- */
- private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String>
currentStateMap,
- Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef)
{
- Set<String> allInstances = new HashSet<>();
- allInstances.addAll(currentStateMap.keySet());
- allInstances.addAll(bestPossibleMap.keySet());
- Map<String, Integer> statePriorityMap =
stateModelDef.getStatePriorityMap();
+ private void getPartitionExpectedAndCurrentStateCountMap(
+ Partition partition,
+ Map<String, List<String>> preferenceLists,
+ IdealState idealState,
+ ResourceControllerDataProvider cache,
+ Map<String, String> currentStateMap,
+ Map<String, Integer> expectedStateCountMapOut,
+ Map<String, Integer> currentStateCountsOut
+ ) {
+ List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
+ int replica = idealState.getMinActiveReplicas() == -1 ? idealState
+ .getReplicaCount(preferenceList.size()) :
idealState.getMinActiveReplicas();
+ Set<String> activeList = new HashSet<>(preferenceList);
+ activeList.retainAll(cache.getEnabledLiveInstances());
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (currentState == null) {
- return false; // null -> state is upward
- }
- if (bestPossibleState != null) {
- // Compare priority values and return if an upward transition is found
- // Note that lower integer value implies higher priority
- if (!statePriorityMap.containsKey(currentState)
- || !statePriorityMap.containsKey(bestPossibleState)) {
- // If the state is not found in statePriorityMap, consider it not
strictly downward by
- // default because we can't determine whether it is downward
- return false;
- }
- if (statePriorityMap.get(currentState) >
statePriorityMap.get(bestPossibleState)) {
- return false;
- }
- }
- }
- return true;
+ // For each state, check that this partition currently has the required
number of that state as
+ // required by StateModelDefinition.
+ String stateModelDefName = idealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef =
cache.getStateModelDef(stateModelDefName);
+ LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDef
+ .getStateCountMap(activeList.size(), replica); //
StateModelDefinition's counts
+
+ // Current counts without disabled partitions or disabled instances
+ Map<String, String> currentStateMapWithoutDisabled = new
HashMap<>(currentStateMap);
+ currentStateMapWithoutDisabled.keySet().removeAll(cache
+ .getDisabledInstancesForPartition(idealState.getResourceName(),
+ partition.getPartitionName()));
+ Map<String, Integer> currentStateCounts =
+ StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+ expectedStateCountMapOut.putAll(expectedStateCountMap);
+ currentStateCountsOut.putAll(currentStateCounts);
}
- /**
- * Check and charge all pending transitions for throttling.
+ /*
+ * Charge pending messages with recovery or load rebalance and update the
retraced partition map
+ * accordingly.
+ * Also update partitionsNeedRecovery, partitionsWithErrorStateReplica
accordingly which is used
+ * by later steps.
*/
- private void chargePendingTransition(Resource resource, CurrentStateOutput
currentStateOutput,
- StateTransitionThrottleController throttleController, Set<Partition>
partitionsNeedRecovery,
- Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider
cache,
- PartitionStateMap bestPossiblePartitionStateMap,
- PartitionStateMap intermediatePartitionStateMap) {
+ private void chargePendingMessages(Resource resource,
+ StateTransitionThrottleController throttleController,
+ CurrentStateOutput currentStateOutput,
+ BestPossibleStateOutput bestPossibleStateOutput,
+ IdealState idealState,
+ ResourceControllerDataProvider cache,
+ Set<Partition> partitionsNeedRecovery,
+ Set<Partition> partitionsWithErrorStateReplica,
+ Map<Partition, Map<String, String>> retracedPartitionsStateMap) {
+
+ logger.trace("throttleControllerstate->{} before pending message",
throttleController);
String resourceName = resource.getResourceName();
+ Map<String, List<String>> preferenceLists =
+ bestPossibleStateOutput.getPreferenceLists(resourceName);
- // check and charge pending transitions
for (Partition partition : resource.getPartitions()) {
- // Maps instance to its current state
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
- // Maps instance to its pending (next) state
- Map<String, String> pendingMap =
- currentStateOutput.getPendingStateMap(resourceName, partition);
-
- StateTransitionThrottleConfig.RebalanceType rebalanceType =
RebalanceType.NONE;
- if (partitionsNeedRecovery.contains(partition)) {
- rebalanceType =
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
- } else if (partitionsNeedLoadbalance.contains(partition)) {
- rebalanceType =
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
- }
-
- if (pendingMap.size() > 0) {
- boolean shouldChargePartition = false;
- for (String instance : pendingMap.keySet()) {
- String currentState = currentStateMap.get(instance);
- String pendingState = pendingMap.get(instance);
- if (pendingState != null && !pendingState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- // Only charge this instance if the partition is not disabled
- throttleController.chargeInstance(rebalanceType, instance);
- shouldChargePartition = true;
- // If there is a pending state transition for the partition, that
means that an assignment
- // has already been made and the state transition message has
already been sent out for the partition
- // in a previous pipeline run. We must honor this and reflect it
by charging for the pending state transition message.
-
- // Since the assignment has already been made for the pending
message, we do a special treatment
- // for it by setting the best possible state directly in
intermediatePartitionStateMap so that the pending
- // message won't be double-assigned or double-charged in recovery
or load balance.
- handlePendingStateTransitionsForThrottling(partition,
partitionsNeedRecovery,
- partitionsNeedLoadbalance, rebalanceType,
bestPossiblePartitionStateMap,
- intermediatePartitionStateMap);
- }
+ Map<String, String> retracedStateMap = new HashMap<>(currentStateMap);
+
+ if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) {
+ partitionsWithErrorStateReplica.add(partition);
+ }
+
+ Map<String, Integer> expectedStateCountMap = new HashMap<>();
+ Map<String, Integer> currentStateCounts = new HashMap<>();
+ getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists,
idealState,
+ cache, currentStateMap, expectedStateCountMap, currentStateCounts);
+
+ Map<String, Message> pendingMessageMap =
+ currentStateOutput.getPendingMessageMap(resourceName, partition);
+ List<Message> pendingMessages = new
ArrayList<>(pendingMessageMap.values());
+ String stateModelDefName = idealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef =
cache.getStateModelDef(stateModelDefName);
+
+ // sort pendingMessages based on transition priority then timeStamp for
state transition message
+ pendingMessages.sort(new PartitionMessageComparator(stateModelDef));
+ List<Message> recoveryMessages = new ArrayList<>();
+ List<Message> loadMessages = new ArrayList<>();
+ for (Message msg : pendingMessages) {
+ if
(!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+ // ignore cancellation message etc. For now, don't charge them.
+ continue;
}
- if (shouldChargePartition) {
- throttleController.chargeCluster(rebalanceType);
- throttleController.chargeResource(rebalanceType, resourceName);
+ String toState = msg.getToState();
+ if (toState.equals(HelixDefinedState.DROPPED.name()) || toState
+ .equals(HelixDefinedState.ERROR.name())) {
+ continue;
+ }
+
+ Integer expectedCount = expectedStateCountMap.get(toState);
+ Integer currentCount = currentStateCounts.get(toState);
+ expectedCount = expectedCount == null ? 0 : expectedCount;
+ currentCount = currentCount == null ? 0 : currentCount;
+
+ boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+ // the gist is that if there is a topState, we should deem the
topState also satisfy as secondTopState requirement.
+ // upward AND (condition 1 or condition 2)
+ // condition1: currentCount < expectedCount
+ // condition2: currentCount == expected && toState is secondary state
&& currentCount(topState) < expectedCount(topState)
+ String topState = stateModelDef.getTopState();
+ String secondTopState = stateModelDef.getStatesPriorityList().get(1);
+ Integer expectedTopCount = expectedStateCountMap.get(topState);
+ Integer currentTopCount = currentStateCounts.get(topState);
+ currentTopCount = currentTopCount == null ? 0 : currentTopCount;
+ expectedTopCount = expectedTopCount == null ? 0 : expectedTopCount;
+
+ if (isUpward && ((currentCount < expectedCount) || (currentCount ==
expectedCount && toState
+ .equals(secondTopState) && currentTopCount < expectedTopCount))) {
+ recoveryMessages.add(msg);
+ partitionsNeedRecovery.add(partition);
+ // update
+ currentStateCounts.put(toState, currentCount + 1);
+ } else {
+ loadMessages.add(msg);
}
}
+ // charge recovery message and retrace
+ for (Message recoveryMsg : recoveryMessages) {
+ String toState = recoveryMsg.getToState();
+ String toInstance = recoveryMsg.getTgtName();
+ // toInstance should be in currentStateMap
+ retracedStateMap.put(toInstance, toState);
+
+ throttleController
+
.chargeInstance(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+ toInstance);
+ throttleController
+
.chargeCluster(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE);
+ throttleController
+
.chargeResource(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+ resourceName);
+ logger.trace("throttleControllerstate->{} after pending recovery
charge msg:{}", throttleController, recoveryMsg);
+ }
+ // charge load message and retrace;
+ // note if M->S with relay message, we don't charge relay message now.
We would charge relay
+ // message only when it shows in pending messages in the next cycle of
controller run.
+ for (Message loadMsg : loadMessages) {
+ String toState = loadMsg.getToState();
+ String toInstance = loadMsg.getTgtName();
+ retracedStateMap.put(toInstance, toState);
+
+ throttleController
+
.chargeInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
toInstance);
+
throttleController.chargeCluster(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+ throttleController
+
.chargeResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
resourceName);
+ logger.trace("throttleControllerstate->{} after pending load charge
msg:{}", throttleController, loadMsg);
+ }
+ retracedPartitionsStateMap.put(partition, retracedStateMap);
}
- }
-
- /**
- * Sort partitions according to partition priority {@link
PartitionPriorityComparator}, and for
- * each partition, throttle state transitions if needed. Also populate
- * intermediatePartitionStateMap either with BestPossibleState (if no
throttling is necessary) or
- * CurrentState (if throttled).
- * @param resource
- * @param bestPossiblePartitionStateMap
- * @param throttleController
- * @param intermediatePartitionStateMap
- * @param partitionsNeedRecovery
- * @param currentStateOutput
- * @param topState
- * @param cache
- * @return a set of partitions that need recovery but did not get recovered
due to throttling
- */
- private Set<Partition> recoveryRebalance(Resource resource,
- PartitionStateMap bestPossiblePartitionStateMap,
- StateTransitionThrottleController throttleController,
- PartitionStateMap intermediatePartitionStateMap, Set<Partition>
partitionsNeedRecovery,
- CurrentStateOutput currentStateOutput, String topState,
- ResourceControllerDataProvider cache) {
- String resourceName = resource.getResourceName();
- Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
- // Maps Partition -> Instance -> State
- Map<Partition, Map<String, String>> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName);
- List<Partition> partitionsNeedRecoveryPrioritized = new
ArrayList<>(partitionsNeedRecovery);
-
- // We want the result of the intermediate state calculation to be
deterministic. We sort here by
- // partition name to ensure that the order is consistent for inputs fed
into
- // PartitionPriorityComparator sort
-
partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName));
- partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator(
- bestPossiblePartitionStateMap.getStateMap(), currentStateMap,
topState, true));
-
- // For each partition, apply throttling if needed.
- for (Partition partition : partitionsNeedRecoveryPrioritized) {
- throttleStateTransitionsForPartition(throttleController, resourceName,
partition,
- currentStateOutput, bestPossiblePartitionStateMap,
partitionRecoveryBalanceThrottled,
- intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE,
cache);
- }
- LogUtil.logInfo(logger, _eventId, String.format(
- "For resource %s: Num of partitions needing recovery: %d, Num of
partitions needing recovery"
- + " but throttled (not recovered): %d",
- resourceName, partitionsNeedRecovery.size(),
partitionRecoveryBalanceThrottled.size()));
- return partitionRecoveryBalanceThrottled;
}
- /**
- * Sort partitions according to partition priority {@link
PartitionPriorityComparator}, and for
- * each partition, throttle state transitions if needed. Also populate
- * intermediatePartitionStateMap either with BestPossibleState (if no
throttling is necessary) or
- * CurrentState (if throttled).
- * @param resource
- * @param currentStateOutput
- * @param bestPossiblePartitionStateMap
- * @param throttleController
- * @param intermediatePartitionStateMap
- * @param partitionsNeedLoadbalance
- * @param currentStateMap
- * @param onlyDownwardLoadBalance true when only allowing downward
transitions
- * @param stateModelDef for determining whether a partition's transitions
are strictly downward
- * @param cache
- * @return
+ /*
+ * Classify the messages of each partition into recovery and load messages.
*/
- private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput
currentStateOutput,
- PartitionStateMap bestPossiblePartitionStateMap,
- StateTransitionThrottleController throttleController,
- PartitionStateMap intermediatePartitionStateMap, Set<Partition>
partitionsNeedLoadbalance,
- Map<Partition, Map<String, String>> currentStateMap, boolean
onlyDownwardLoadBalance,
- StateModelDefinition stateModelDef, ResourceControllerDataProvider
cache) {
+ private void classifyMessages(
+ Resource resource,
+ CurrentStateOutput currentStateOutput,
+ BestPossibleStateOutput bestPossibleStateOutput,
+ IdealState idealState,
+ ResourceControllerDataProvider cache,
+ Map<Partition, List<Message>> selectedResourceMessages,
+
+ List<Message> recoveryMessages,
+ List<Message> loadMessages,
+ Map<Message, Partition> messagePartitionMap
+ ) {
+
String resourceName = resource.getResourceName();
- Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
+ Map<String, List<String>> preferenceLists =
+ bestPossibleStateOutput.getPreferenceLists(resourceName);
+ LogUtil.logInfo(logger, _eventId, String.format("Classify message for
resource: %s", resourceName));
- List<Partition> partitionsNeedLoadRebalancePrioritized =
- new ArrayList<>(partitionsNeedLoadbalance);
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+ Map<String, Integer> expectedStateCountMap = new HashMap<>();
+ Map<String, Integer> currentStateCounts = new HashMap<>();
- // We want the result of the intermediate state calculation to be
deterministic. We sort here by
- // partition name to ensure that the order is consistent for inputs fed
into
- // PartitionPriorityComparator sort
-
partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName));
- partitionsNeedLoadRebalancePrioritized.sort(new
PartitionPriorityComparator(
- bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "",
false));
+ getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists,
idealState,
+ cache, currentStateMap, expectedStateCountMap, currentStateCounts);
- for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
- // If this is a downward load balance, check if the partition's
transition is strictly
- // downward
- if (onlyDownwardLoadBalance) {
- Map<String, String> currentStateMapForPartition =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- Map<String, String> bestPossibleMapForPartition =
- bestPossiblePartitionStateMap.getPartitionMap(partition);
- if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
- bestPossibleMapForPartition, stateModelDef)) {
- // For downward load balance, if a partition's transitions are not
strictly downward,
- // set currentState to intermediateState
- intermediatePartitionStateMap.setState(partition,
currentStateMapForPartition);
+ List<Message> partitionMessages =
selectedResourceMessages.get(partition);
+ if (partitionMessages == null) {
+ continue;
+ }
+
+ String stateModelDefName = idealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef =
cache.getStateModelDef(stateModelDefName);
+ // sort partitionMessages based on transition priority and then creation
timestamp for transition message
+ partitionMessages.sort(new PartitionMessageComparator(stateModelDef));
+ Set<String> disabledInstances =
+ cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName());
+ for (Message msg : partitionMessages) {
+ if
(!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+ if (logger.isDebugEnabled()) {
+ LogUtil.logDebug(logger, _eventId,
+ String.format("Message: %s not subject to throttle in
resource: %s with type %s",
+ msg, resourceName, msg.getMsgType()));
+ }
continue;
}
- }
- throttleStateTransitionsForPartition(throttleController, resourceName,
partition,
- currentStateOutput, bestPossiblePartitionStateMap,
partitionsLoadbalanceThrottled,
- intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
- }
- LogUtil.logInfo(logger, _eventId,
- String.format(
- "For resource %s: Num of partitions needing load-balance: %d, Num
of partitions needing"
- + " load-balance but throttled (not load-balanced): %d",
- resourceName, partitionsNeedLoadbalance.size(),
partitionsLoadbalanceThrottled.size()));
- return partitionsLoadbalanceThrottled;
- }
- /**
- * Check the status on throttling at every level (cluster, resource,
instance) and set
- * intermediatePartitionStateMap accordingly per partition.
- * @param throttleController
- * @param resourceName
- * @param partition
- * @param currentStateOutput
- * @param bestPossiblePartitionStateMap
- * @param partitionsThrottled
- * @param intermediatePartitionStateMap
- * @param rebalanceType
- * @param cache
- */
- private void throttleStateTransitionsForPartition(
- StateTransitionThrottleController throttleController, String
resourceName,
- Partition partition, CurrentStateOutput currentStateOutput,
- PartitionStateMap bestPossiblePartitionStateMap, Set<Partition>
partitionsThrottled,
- PartitionStateMap intermediatePartitionStateMap, RebalanceType
rebalanceType,
- ResourceControllerDataProvider cache) {
-
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- Map<String, String> bestPossibleMap =
bestPossiblePartitionStateMap.getPartitionMap(partition);
- Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
- allInstances.addAll(bestPossibleMap.keySet());
- Map<String, String> intermediateMap = new HashMap<>();
-
- boolean hasReachedThrottlingLimit = false;
- if (throttleController.shouldThrottleForResource(rebalanceType,
resourceName)) {
- hasReachedThrottlingLimit = true;
- if (logger.isDebugEnabled()) {
- LogUtil.logDebug(logger, _eventId,
- String.format("Throttled on partition: %s in resource: %s",
- partition.getPartitionName(), resourceName));
- }
- } else {
- // throttle if any of the instances are not able to accept state
transitions
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null &&
!bestPossibleState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- if (throttleController.shouldThrottleForInstance(rebalanceType,
instance)) {
- hasReachedThrottlingLimit = true;
+ messagePartitionMap.put(msg, partition);
+ boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+
+ // for disabled disabled instance, the downward transition is not
subjected to load throttling
+ // we will let them pass through ASAP.
+ String instance = msg.getTgtName();
+ if (disabledInstances.contains(instance)) {
+ if (!isUpward) {
if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId,
- String.format(
- "Throttled because of instance: %s for partition: %s in
resource: %s",
- instance, partition.getPartitionName(), resourceName));
+ String.format("Message: %s not subject to throttle in
resource: %s to disabled instancce %s",
+ msg, resourceName, instance));
}
- break;
+ continue;
}
}
- }
- }
- if (!hasReachedThrottlingLimit) {
- // This implies that there is room for more state transitions.
- // Find instances with a replica whose current state is different from
BestPossibleState and
- // "charge" for it, and bestPossibleStates will become intermediate
states
- intermediateMap.putAll(bestPossibleMap);
- boolean shouldChargeForPartition = false;
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null &&
!bestPossibleState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- throttleController.chargeInstance(rebalanceType, instance);
- shouldChargeForPartition = true;
+
+ String toState = msg.getToState();
+ if (toState.equals(HelixDefinedState.DROPPED.name()) || toState
+ .equals(HelixDefinedState.ERROR.name())) {
+ if (logger.isDebugEnabled()) {
+ LogUtil.logDebug(logger, _eventId,
+ String.format("Message: %s not subject to throttle in
resource: %s with toState %s",
+ msg, resourceName, toState));
+ }
+ continue;
}
- }
- if (shouldChargeForPartition) {
- throttleController.chargeCluster(rebalanceType);
- throttleController.chargeResource(rebalanceType, resourceName);
- }
- } else {
- // No more room for more state transitions; current states will just
become intermediate
- // states unless the partition is disabled
- // Add this partition to a set of throttled partitions
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null &&
!bestPossibleState.equals(currentState)
- && cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- // Because this partition is disabled, we allow assignment
- intermediateMap.put(instance, bestPossibleState);
+
+ Integer expectedCount = expectedStateCountMap.get(toState);
+ Integer currentCount = currentStateCounts.get(toState);
+ expectedCount = expectedCount == null ? 0 : expectedCount;
+ currentCount = currentCount == null ? 0 : currentCount;
+
+ String topState = stateModelDef.getTopState();
+ String secondTopState = stateModelDef.getStatesPriorityList().get(1);
+ Integer expectedTopCount = expectedStateCountMap.get(topState);
+ Integer currentTopCount = currentStateCounts.get(topState);
+ currentTopCount = currentTopCount == null ? 0 : currentTopCount;
+ expectedTopCount = expectedTopCount == null ? 0 : expectedTopCount;
+
+ if (isUpward && ((currentCount < expectedCount) || (currentCount ==
expectedCount && toState
+ .equals(secondTopState) && currentTopCount < expectedTopCount))) {
+ recoveryMessages.add(msg);
+ currentStateCounts.put(toState, currentCount + 1);
} else {
- // This partition is not disabled, so it must be throttled by just
passing on the current
- // state
- if (currentState != null) {
- intermediateMap.put(instance, currentState);
- }
- partitionsThrottled.add(partition);
+ loadMessages.add(msg);
}
}
}
- intermediatePartitionStateMap.setState(partition, intermediateMap);
}
- /**
- * For a partition, given its preferenceList, bestPossibleState, and
currentState, determine which
- * type of rebalance is needed to model IdealState's states defined by the
state model definition.
- * @return RebalanceType needed to bring the replicas to idea states
- * RECOVERY_BALANCE - not all required states (replicas) are
available through all
- * replicas, or the partition is disabled
- * NONE - current state matches the ideal state
- * LOAD_BALANCE - although all replicas required exist, Helix needs
to optimize the
- * allocation
- */
- private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
- Map<String, String> bestPossibleMap, List<String> preferenceList,
- StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
- IdealState idealState, String partitionName) {
- if (preferenceList == null) {
- preferenceList = Collections.emptyList();
+ private void applyThrottling(Resource resource,
+ StateTransitionThrottleController throttleController,
+ Map<Partition, Map<String, String>> currentStateMap,
+ Map<Partition, Map<String, String>> bestPossibleMap,
+ IdealState idealState,
+ ResourceControllerDataProvider cache,
+ boolean onlyDownwardLoadBalance,
+ List<Message> messages,
+ Map<Message, Partition> messagePartitionMap,
+ Set<Message> throttledMessages,
+ StateTransitionThrottleConfig.RebalanceType rebalanceType
+ ) {
+ boolean isRecovery = rebalanceType ==
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
+ if (isRecovery && onlyDownwardLoadBalance) {
Review comment:
Feel free to skip the IntermediateStateCalcStage, the new logic is in
PerReplicaThrottleStage.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -396,447 +368,386 @@ private PartitionStateMap
computeIntermediatePartitionState(ResourceControllerDa
// Perform regular load balance only if the number of partitions in
recovery and in error is
// less than the threshold. Otherwise, only allow downward-transition load
balance
boolean onlyDownwardLoadBalance = partitionCount > threshold;
-
- loadbalanceThrottledPartitions = loadRebalance(resource,
currentStateOutput,
- bestPossiblePartitionStateMap, throttleController,
intermediatePartitionStateMap,
- partitionsNeedLoadBalance,
currentStateOutput.getCurrentStateMap(resourceName),
- onlyDownwardLoadBalance, stateModelDef, cache);
-
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.updateRebalancerStats(resourceName,
partitionsNeedRecovery.size(),
- partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(),
- loadbalanceThrottledPartitions.size());
+ Set<Message> throttledLoadMessages = new HashSet<>();
+ LogUtil.logDebug(logger, _eventId,
+ String.format("applying load rebalance with resource %s,
onlyDownwardLoadBalance %s",
+ resourceName, onlyDownwardLoadBalance));
+ applyThrottling(resource, throttleController, currentStateMap,
bestPossibleMap, idealState,
+ cache, onlyDownwardLoadBalance, loadMessages, messagePartitionMap,
throttledLoadMessages,
+ StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+
+ LogUtil.logDebug(logger, _eventId,
+ String.format("resource %s, throttled recovery message: %s",
resourceName,throttledRecoveryMessages));
+ LogUtil.logDebug(logger, _eventId,
+ String.format("resource %s, throttled load messages: %s",
resourceName,throttledLoadMessages));
+
+ throttledRecoveryMsgOut.addAll(throttledRecoveryMessages);
+ throttledLoadMessageOut.addAll(throttledLoadMessages);
+
+ // Step 5: construct output
+ Map<Partition, List<Message>> out = new HashMap<>();
+ for (Partition partition : resource.getPartitions()) {
+ List<Message> partitionMessages =
selectedResourceMessages.get(partition);
+ if (partitionMessages == null) {
+ continue;
+ }
+ List<Message> finalPartitionMessages = new ArrayList<>();
+ for (Message message: partitionMessages) {
+ if (throttledRecoveryMessages.contains(message)) {
+ continue;
+ }
+ if (throttledLoadMessages.contains(message)) {
+ continue;
+ }
+ finalPartitionMessages.add(message);
+ }
+ out.put(partition, finalPartitionMessages);
}
- if (logger.isDebugEnabled()) {
- logPartitionMapState(resourceName, new
HashSet<>(resource.getPartitions()),
- partitionsNeedRecovery, recoveryThrottledPartitions,
partitionsNeedLoadBalance,
- loadbalanceThrottledPartitions, currentStateOutput,
bestPossiblePartitionStateMap,
- intermediatePartitionStateMap);
+ // Step 6: constructs all retraced partition state map for the resource;
+ for (Partition partition : resource.getPartitions()) {
+ List<Message> partitionMessages = out.get(partition);
+ if (partitionMessages == null) {
+ continue;
+ }
+ for (Message message : partitionMessages) {
+ if
(!Message.MessageType.STATE_TRANSITION.name().equals(message.getMsgType())) {
+ // todo: log?
+ // ignore cancellation message etc.
+ continue;
+ }
+ String toState = message.getToState();
+ // toIntance may not be in the retracedStateMap as so far it is
current state based.
Review comment:
Feel free to skip the IntermediateStateCalcStage, the new logic is in
PerReplicaThrottleStage.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]