dasahcc commented on a change in pull request #1532:
URL: https://github.com/apache/helix/pull/1532#discussion_r534665820
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -33,182 +32,107 @@
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.monitoring.mbeans.ResourceMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * For partition compute the Intermediate State (instance,state) pair based on
the BestPossibleState
- * and CurrentState, with all constraints applied (such as state transition
throttling).
- */
-public class IntermediateStateCalcStage extends AbstractBaseStage {
+
+public class PerReplicaThrottleStage extends AbstractBaseStage {
private static final Logger logger =
- LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
+ LoggerFactory.getLogger(PerReplicaThrottleStage.class.getName());
+
+ private boolean isEmitThrottledMsg = false;
+
+ public PerReplicaThrottleStage() {
+ this(false);
+ }
+
+ public PerReplicaThrottleStage(boolean enableEmitThrottledMsg) {
+ isEmitThrottledMsg = enableEmitThrottledMsg;
+ }
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
+
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ MessageOutput selectedMessages =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ LogUtil.logDebug(logger, _eventId, String.format("selectedMessages is:
%s", selectedMessages));
+
Map<String, Resource> resourceToRebalance =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
ResourceControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
- if (currentStateOutput == null || bestPossibleStateOutput == null ||
resourceToRebalance == null
+ if (currentStateOutput == null || selectedMessages == null ||
resourceToRebalance == null
|| cache == null) {
throw new StageException(String.format("Missing attributes in event: %s.
"
- + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES
(%s) |DataCache (%s)",
- event, currentStateOutput, bestPossibleStateOutput,
resourceToRebalance, cache));
+ + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s)
|RESOURCES (%s) |DataCache (%s)",
+ event, currentStateOutput, selectedMessages, resourceToRebalance,
cache));
}
- IntermediateStateOutput intermediateStateOutput =
- compute(event, resourceToRebalance, currentStateOutput,
bestPossibleStateOutput);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(),
intermediateStateOutput);
+ ResourcesStateMap retracedResourceStateMap = new ResourcesStateMap();
+ List<Message> throttledRecoveryMsg = new ArrayList<>();
+ List<Message> throttledLoadMsg = new ArrayList<>();
+ MessageOutput output =
+ compute(event, resourceToRebalance, currentStateOutput,
selectedMessages, retracedResourceStateMap, throttledRecoveryMsg,
throttledLoadMsg);
- // Make sure no instance has more replicas/partitions assigned than
maxPartitionPerInstance. If
- // it does, pause the rebalance and put the cluster on maintenance mode
- int maxPartitionPerInstance =
cache.getClusterConfig().getMaxPartitionsPerInstance();
- if (maxPartitionPerInstance > 0) {
- validateMaxPartitionsPerInstance(event, cache, intermediateStateOutput,
- maxPartitionPerInstance);
- }
- }
-
- /**
- * Go through each resource, and based on BestPossibleState and
CurrentState, compute
- * IntermediateState as close to BestPossibleState while maintaining
throttling constraints (for
- * example, ensure that the number of possible pending state transitions
does NOT go over the set
- * threshold).
- * @param event
- * @param resourceMap
- * @param currentStateOutput
- * @param bestPossibleStateOutput
- * @return
- */
- private IntermediateStateOutput compute(ClusterEvent event, Map<String,
Resource> resourceMap,
- CurrentStateOutput currentStateOutput, BestPossibleStateOutput
bestPossibleStateOutput) {
- IntermediateStateOutput output = new IntermediateStateOutput();
- ResourceControllerDataProvider dataCache =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
-
- StateTransitionThrottleController throttleController = new
StateTransitionThrottleController(
- resourceMap.keySet(), dataCache.getClusterConfig(),
dataCache.getLiveInstances().keySet());
-
- // Resource level prioritization based on the numerical (sortable)
priority field.
- // If the resource priority field is null/not set, the resource will be
treated as lowest
- // priority.
- List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
- for (String resourceName : resourceMap.keySet()) {
- prioritizedResourceList.add(new ResourcePriority(resourceName,
Integer.MIN_VALUE));
- }
- // If resourcePriorityField is null at the cluster level, all resources
will be considered equal
- // in priority by keeping all priorities at MIN_VALUE
- if (dataCache.getClusterConfig().getResourcePriorityField() != null) {
- String priorityField =
dataCache.getClusterConfig().getResourcePriorityField();
- for (ResourcePriority resourcePriority : prioritizedResourceList) {
- String resourceName = resourcePriority.getResourceName();
-
- // Will take the priority from ResourceConfig first
- // If ResourceConfig does not exist or does not have this field.
- // Try to load it from the resource's IdealState. Otherwise, keep it
at the lowest priority
- if (dataCache.getResourceConfig(resourceName) != null
- &&
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField) !=
null) {
- resourcePriority.setPriority(
-
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField));
- } else if (dataCache.getIdealState(resourceName) != null && dataCache
-
.getIdealState(resourceName).getRecord().getSimpleField(priorityField) != null)
{
- resourcePriority.setPriority(
-
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
+ if (logger.isDebugEnabled()) {
+ LogUtil.logDebug(logger, _eventId, String.format("output is"));
+ for (String resource : resourceToRebalance.keySet()) {
+ if (output.getResourceMessages(resource) != null) {
+ LogUtil.logDebug(logger, _eventId, String.format("resource: %s",
resource));
+ Map<Partition, List<Message>> partitionListMap =
output.getResourceMessages(resource);
+ for (Partition partition : partitionListMap.keySet()) {
+ for (Message msg : partitionListMap.get(partition)) {
+ LogUtil.logDebug(logger, _eventId, String
+ .format("\tresource: %s, partition: %s, msg: %s", resource,
partition, msg));
+ }
+ }
}
}
- prioritizedResourceList.sort(new ResourcePriorityComparator());
}
+ event.addAttribute(AttributeName.PER_REPLICA_THROTTLED_MESSAGES.name(),
output);
+ LogUtil.logDebug(logger,_eventId, String.format("retraceResourceStateMap
is: %s", retracedResourceStateMap));
+ event.addAttribute(AttributeName.PER_REPLICA_RETRACED_STATES.name(),
retracedResourceStateMap);
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- List<String> failedResources = new ArrayList<>();
-
- // Priority is applied in assignment computation because higher priority
by looping in order of
- // decreasing priority
- for (ResourcePriority resourcePriority : prioritizedResourceList) {
- String resourceName = resourcePriority.getResourceName();
-
- if (!bestPossibleStateOutput.containsResource(resourceName)) {
- LogUtil.logInfo(logger, _eventId, String.format(
- "Skip calculating intermediate state for resource %s because the
best possible state is not available.",
- resourceName));
- continue;
- }
-
- Resource resource = resourceMap.get(resourceName);
- IdealState idealState = dataCache.getIdealState(resourceName);
- if (idealState == null) {
- // If IdealState is null, use an empty one
- LogUtil.logInfo(logger, _eventId,
- String.format(
- "IdealState for resource %s does not exist; resource may not
exist anymore",
- resourceName));
- idealState = new IdealState(resourceName);
- idealState.setStateModelDefRef(resource.getStateModelDefRef());
- }
-
- try {
- output.setState(resourceName,
- computeIntermediatePartitionState(dataCache, clusterStatusMonitor,
idealState,
- resourceMap.get(resourceName), currentStateOutput,
- bestPossibleStateOutput.getPartitionStateMap(resourceName),
- bestPossibleStateOutput.getPreferenceLists(resourceName),
throttleController));
- } catch (HelixException ex) {
- LogUtil.logInfo(logger, _eventId,
- "Failed to calculate intermediate partition states for resource "
+ resourceName, ex);
- failedResources.add(resourceName);
- }
+ if (isEmitThrottledMsg) {
Review comment:
Why we need this? Is that for backtrace the intermediate state map?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]