zhangmeng916 commented on a change in pull request #1532:
URL: https://github.com/apache/helix/pull/1532#discussion_r541462205



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -33,182 +32,107 @@
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * For partition compute the Intermediate State (instance,state) pair based on 
the BestPossibleState
- * and CurrentState, with all constraints applied (such as state transition 
throttling).
- */
-public class IntermediateStateCalcStage extends AbstractBaseStage {
+
+public class PerReplicaThrottleStage extends AbstractBaseStage {
   private static final Logger logger =
-      LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
+      LoggerFactory.getLogger(PerReplicaThrottleStage.class.getName());
+
+  private boolean isEmitThrottledMsg = false;
+
+  public PerReplicaThrottleStage() {
+    this(false);
+  }
+
+  public PerReplicaThrottleStage(boolean enableEmitThrottledMsg) {
+    isEmitThrottledMsg = enableEmitThrottledMsg;

Review comment:
       Following our convention, you can write it as _isEmitThrottledMsg = 
isEmitThrottledMsg;

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -276,116 +191,173 @@ private void 
validateMaxPartitionsPerInstance(ClusterEvent event,
   }
 
   /**
-   * Compute intermediate partition states for a prioritized resource.
-   * @param cache
-   * @param clusterStatusMonitor
-   * @param idealState
-   * @param resource
+   * Go through each resource, and based on messageSelected and currentState, 
compute
+   * messageOutput while maintaining throttling constraints (for example, 
ensure that the number
+   * of possible pending state transitions does NOT go over the set threshold).
+   * @param event
+   * @param resourceMap
    * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param preferenceLists
-   * @param throttleController
+   * @param selectedMessage
+   * @param retracedResourceStateMap out
    * @return
    */
-  private PartitionStateMap 
computeIntermediatePartitionState(ResourceControllerDataProvider cache,
-      ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, 
Resource resource,
-      CurrentStateOutput currentStateOutput, PartitionStateMap 
bestPossiblePartitionStateMap,
-      Map<String, List<String>> preferenceLists,
-      StateTransitionThrottleController throttleController) {
-    String resourceName = resource.getResourceName();
-    LogUtil.logDebug(logger, _eventId, String.format("Processing resource: 
%s", resourceName));
+  private MessageOutput compute(ClusterEvent event, Map<String, Resource> 
resourceMap,
+      CurrentStateOutput currentStateOutput, MessageOutput selectedMessage,
+      ResourcesStateMap retracedResourceStateMap,
+      List<Message> throttledRecoveryMsg, List<Message> throttledLoadMsg) {
+    MessageOutput output = new MessageOutput();
 
-    // Throttling is applied only on FULL-AUTO mode
-    if (!throttleController.isThrottleEnabled()
-        || 
!IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())) {
-      return bestPossiblePartitionStateMap;
-    }
+    ResourceControllerDataProvider dataCache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
 
-    String stateModelDefName = idealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
-    PartitionStateMap intermediatePartitionStateMap = new 
PartitionStateMap(resourceName);
+    StateTransitionThrottleController throttleController =
+        new StateTransitionThrottleController(resourceMap.keySet(), 
dataCache.getClusterConfig(),
+            dataCache.getLiveInstances().keySet());
 
-    Set<Partition> partitionsNeedRecovery = new HashSet<>();
-    Set<Partition> partitionsNeedLoadBalance = new HashSet<>();
-    Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, partition);
-      Map<String, String> bestPossibleMap =
-          bestPossiblePartitionStateMap.getPartitionMap(partition);
-      List<String> preferenceList = 
preferenceLists.get(partition.getPartitionName());
+    // Resource level prioritization based on the numerical (sortable) 
priority field.

Review comment:
       I cannot tell whether this part has changed or not. But I assume you 
didn't change any logic for this resource priority calculation?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -396,447 +368,386 @@ private PartitionStateMap 
computeIntermediatePartitionState(ResourceControllerDa
     // Perform regular load balance only if the number of partitions in 
recovery and in error is
     // less than the threshold. Otherwise, only allow downward-transition load 
balance
     boolean onlyDownwardLoadBalance = partitionCount > threshold;
-
-    loadbalanceThrottledPartitions = loadRebalance(resource, 
currentStateOutput,
-        bestPossiblePartitionStateMap, throttleController, 
intermediatePartitionStateMap,
-        partitionsNeedLoadBalance, 
currentStateOutput.getCurrentStateMap(resourceName),
-        onlyDownwardLoadBalance, stateModelDef, cache);
-
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor.updateRebalancerStats(resourceName, 
partitionsNeedRecovery.size(),
-          partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(),
-          loadbalanceThrottledPartitions.size());
+    Set<Message> throttledLoadMessages = new HashSet<>();
+    LogUtil.logDebug(logger, _eventId,
+        String.format("applying load rebalance with resource %s, 
onlyDownwardLoadBalance %s",
+            resourceName, onlyDownwardLoadBalance));
+    applyThrottling(resource, throttleController, currentStateMap, 
bestPossibleMap, idealState,
+        cache, onlyDownwardLoadBalance, loadMessages, messagePartitionMap, 
throttledLoadMessages,
+        StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+
+    LogUtil.logDebug(logger, _eventId,
+        String.format("resource %s, throttled recovery message: %s", 
resourceName,throttledRecoveryMessages));
+    LogUtil.logDebug(logger, _eventId,
+        String.format("resource %s, throttled load messages: %s", 
resourceName,throttledLoadMessages));
+
+    throttledRecoveryMsgOut.addAll(throttledRecoveryMessages);
+    throttledLoadMessageOut.addAll(throttledLoadMessages);
+
+    // Step 5: construct output
+    Map<Partition, List<Message>> out = new HashMap<>();
+    for (Partition partition : resource.getPartitions()) {
+      List<Message> partitionMessages = 
selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+      List<Message> finalPartitionMessages = new ArrayList<>();
+      for (Message message: partitionMessages) {
+        if (throttledRecoveryMessages.contains(message)) {
+          continue;
+        }
+        if (throttledLoadMessages.contains(message)) {
+          continue;
+        }
+        finalPartitionMessages.add(message);
+      }
+      out.put(partition, finalPartitionMessages);
     }
 
-    if (logger.isDebugEnabled()) {
-      logPartitionMapState(resourceName, new 
HashSet<>(resource.getPartitions()),
-          partitionsNeedRecovery, recoveryThrottledPartitions, 
partitionsNeedLoadBalance,
-          loadbalanceThrottledPartitions, currentStateOutput, 
bestPossiblePartitionStateMap,
-          intermediatePartitionStateMap);
+    // Step 6: constructs all retraced partition state map for the resource;
+    for (Partition partition : resource.getPartitions()) {
+      List<Message> partitionMessages = out.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+      for (Message message : partitionMessages) {
+        if 
(!Message.MessageType.STATE_TRANSITION.name().equals(message.getMsgType())) {
+          // todo: log?
+          // ignore cancellation message etc.
+          continue;
+        }
+        String toState = message.getToState();
+        // toIntance may not be in the retracedStateMap as so far it is 
current state based.

Review comment:
       Can you explain a bit more what's in the retraced state map.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -276,116 +191,173 @@ private void 
validateMaxPartitionsPerInstance(ClusterEvent event,
   }
 
   /**
-   * Compute intermediate partition states for a prioritized resource.
-   * @param cache
-   * @param clusterStatusMonitor
-   * @param idealState
-   * @param resource
+   * Go through each resource, and based on messageSelected and currentState, 
compute
+   * messageOutput while maintaining throttling constraints (for example, 
ensure that the number
+   * of possible pending state transitions does NOT go over the set threshold).
+   * @param event
+   * @param resourceMap
    * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param preferenceLists
-   * @param throttleController
+   * @param selectedMessage
+   * @param retracedResourceStateMap out
    * @return
    */
-  private PartitionStateMap 
computeIntermediatePartitionState(ResourceControllerDataProvider cache,
-      ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, 
Resource resource,
-      CurrentStateOutput currentStateOutput, PartitionStateMap 
bestPossiblePartitionStateMap,
-      Map<String, List<String>> preferenceLists,
-      StateTransitionThrottleController throttleController) {
-    String resourceName = resource.getResourceName();
-    LogUtil.logDebug(logger, _eventId, String.format("Processing resource: 
%s", resourceName));
+  private MessageOutput compute(ClusterEvent event, Map<String, Resource> 
resourceMap,
+      CurrentStateOutput currentStateOutput, MessageOutput selectedMessage,
+      ResourcesStateMap retracedResourceStateMap,
+      List<Message> throttledRecoveryMsg, List<Message> throttledLoadMsg) {
+    MessageOutput output = new MessageOutput();
 
-    // Throttling is applied only on FULL-AUTO mode
-    if (!throttleController.isThrottleEnabled()
-        || 
!IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())) {
-      return bestPossiblePartitionStateMap;
-    }
+    ResourceControllerDataProvider dataCache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
 
-    String stateModelDefName = idealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
-    PartitionStateMap intermediatePartitionStateMap = new 
PartitionStateMap(resourceName);
+    StateTransitionThrottleController throttleController =
+        new StateTransitionThrottleController(resourceMap.keySet(), 
dataCache.getClusterConfig(),
+            dataCache.getLiveInstances().keySet());
 
-    Set<Partition> partitionsNeedRecovery = new HashSet<>();
-    Set<Partition> partitionsNeedLoadBalance = new HashSet<>();
-    Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, partition);
-      Map<String, String> bestPossibleMap =
-          bestPossiblePartitionStateMap.getPartitionMap(partition);
-      List<String> preferenceList = 
preferenceLists.get(partition.getPartitionName());
+    // Resource level prioritization based on the numerical (sortable) 
priority field.
+    // If the resource priority field is null/not set, the resource will be 
treated as lowest
+    // priority.
+    List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
+    for (String resourceName : resourceMap.keySet()) {
+      prioritizedResourceList.add(new ResourcePriority(resourceName, 
Integer.MIN_VALUE));
+    }
+    // If resourcePriorityField is null at the cluster level, all resources 
will be considered equal
+    // in priority by keeping all priorities at MIN_VALUE
+    if (dataCache.getClusterConfig().getResourcePriorityField() != null) {
+      String priorityField = 
dataCache.getClusterConfig().getResourcePriorityField();
+      for (ResourcePriority resourcePriority : prioritizedResourceList) {
+        String resourceName = resourcePriority.getResourceName();
 
-      RebalanceType rebalanceType = getRebalanceType(cache, bestPossibleMap, 
preferenceList,
-          stateModelDef, currentStateMap, idealState, 
partition.getPartitionName());
+        // Will take the priority from ResourceConfig first
+        // If ResourceConfig does not exist or does not have this field.
+        // Try to load it from the resource's IdealState. Otherwise, keep it 
at the lowest priority
+        if (dataCache.getResourceConfig(resourceName) != null
+            && 
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField) != 
null) {
+          resourcePriority.setPriority(
+              
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField));
+        } else if (dataCache.getIdealState(resourceName) != null
+            && 
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField)
+            != null) {
+          resourcePriority.setPriority(
+              
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
+        }
+      }
+      prioritizedResourceList.sort(new ResourcePriorityComparator());
+    }
 
-      // TODO: refine getRebalanceType to return more accurate rebalance 
types. So the following
-      // logic doesn't need to check for more details.
-      boolean isRebalanceNeeded = false;
+    List<String> failedResources = new ArrayList<>();
 
-      // Check whether partition has any ERROR state replicas
-      if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) {
-        partitionsWithErrorStateReplica.add(partition);
+    // Priority is applied in assignment computation because higher priority 
by looping in order of
+    // decreasing priority
+    for (ResourcePriority resourcePriority : prioritizedResourceList) {
+      String resourceName = resourcePriority.getResourceName();
+
+      BestPossibleStateOutput bestPossibleStateOutput =
+          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+      if (!bestPossibleStateOutput.containsResource(resourceName)) {
+        LogUtil.logInfo(logger, _eventId, String.format(
+            "Skip calculating per replica state for resource %s because the 
best possible state is not available.",
+            resourceName));
+        continue;
       }
 
-      // Number of states required by StateModelDefinition are not satisfied, 
need recovery
-      if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
-        // Check if recovery is needed for this partition
-        if (!currentStateMap.equals(bestPossibleMap)) {
-          partitionsNeedRecovery.add(partition);
-          isRebalanceNeeded = true;
-        }
-      } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
-        // Number of states required by StateModelDefinition are satisfied, 
but to achieve
-        // BestPossibleState, need load balance
-        partitionsNeedLoadBalance.add(partition);
-        isRebalanceNeeded = true;
+      Resource resource = resourceMap.get(resourceName);
+      IdealState idealState = dataCache.getIdealState(resourceName);
+      if (idealState == null) {
+        // If IdealState is null, use an empty one
+        LogUtil.logInfo(logger, _eventId, String
+            .format("IdealState for resource %s does not exist; resource may 
not exist anymore",
+                resourceName));
+        idealState = new IdealState(resourceName);
+        idealState.setStateModelDefRef(resource.getStateModelDefRef());
       }
 
-      // Currently at BestPossibleState, no further action necessary
-      if (!isRebalanceNeeded) {
-        Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap);
-        intermediatePartitionStateMap.setState(partition, intermediateMap);
+      Map<Partition, Map<String, String>> retracedPartitionsState = new 
HashMap<>();
+      try {
+        Map<Partition, List<Message>> resourceMessages =
+            computePerReplicaPartitionState(idealState, currentStateOutput,
+                selectedMessage.getResourceMessages(resourceName), 
resourceMap.get(resourceName),
+                bestPossibleStateOutput, dataCache,
+                throttleController, retracedPartitionsState, 
throttledRecoveryMsg, throttledLoadMsg);
+        output.addResourceMessages(resourceName, resourceMessages);
+        retracedResourceStateMap.setState(resourceName, 
retracedPartitionsState);
+      } catch (HelixException ex) {
+        LogUtil.logInfo(logger, _eventId,
+            "Failed to calculate per replica partition states for resource " + 
resourceName, ex);
+        failedResources.add(resourceName);
       }
     }
 
-    if (!partitionsNeedRecovery.isEmpty()) {
-      LogUtil.logInfo(logger, _eventId, String.format(
-          "Recovery balance needed for %s partitions: %s", resourceName, 
partitionsNeedRecovery));
-    }
-    if (!partitionsNeedLoadBalance.isEmpty()) {
-      LogUtil.logInfo(logger, _eventId, String.format("Load balance needed for 
%s partitions: %s",
-          resourceName, partitionsNeedLoadBalance));
-    }
-    if (!partitionsWithErrorStateReplica.isEmpty()) {
-      LogUtil.logInfo(logger, _eventId,
-          String.format("Partition currently has an ERROR replica in %s 
partitions: %s",
-              resourceName, partitionsWithErrorStateReplica));
+    return output;
+  }
+
+  /*
+   * Apply per-replica throttling logic and filter out excessive recovery and 
load messages for a
+   * given resource.
+   * Reconstruct retrace partition states for a resource based on pending and 
targeted messages
+   * Return messages for partitions of a resource.
+   * Out param retracedPartitionsCurrentState

Review comment:
       what is this?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -259,14 +181,7 @@ private void validateMaxPartitionsPerInstance(ClusterEvent 
event,
                       + " mode due to an instance being assigned more 
replicas/partitions than "
                       + "the limit.");
             }
-
-            ClusterStatusMonitor clusterStatusMonitor =
-                event.getAttribute(AttributeName.clusterStatusMonitor.name());
-            if (clusterStatusMonitor != null) {
-              
clusterStatusMonitor.setResourceRebalanceStates(Collections.singletonList(resource),
-                  
ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
-            }
-            // Throw an exception here so that messages won't be sent out 
based on this mapping
+            //TODO: add metrics

Review comment:
       Previous code has `clusterStatusMonitor` already, and I think you can 
just copy them over. No need to be to TODO right?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -33,182 +32,107 @@
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * For partition compute the Intermediate State (instance,state) pair based on 
the BestPossibleState
- * and CurrentState, with all constraints applied (such as state transition 
throttling).
- */
-public class IntermediateStateCalcStage extends AbstractBaseStage {
+
+public class PerReplicaThrottleStage extends AbstractBaseStage {
   private static final Logger logger =
-      LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
+      LoggerFactory.getLogger(PerReplicaThrottleStage.class.getName());
+
+  private boolean isEmitThrottledMsg = false;
+
+  public PerReplicaThrottleStage() {
+    this(false);
+  }
+
+  public PerReplicaThrottleStage(boolean enableEmitThrottledMsg) {
+    isEmitThrottledMsg = enableEmitThrottledMsg;
+  }
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
+
     CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
 
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    MessageOutput selectedMessages = 
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    LogUtil.logDebug(logger, _eventId, String.format("selectedMessages is: 
%s", selectedMessages));
+
     Map<String, Resource> resourceToRebalance =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     ResourceControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
 
-    if (currentStateOutput == null || bestPossibleStateOutput == null || 
resourceToRebalance == null
+    if (currentStateOutput == null || selectedMessages == null || 
resourceToRebalance == null
         || cache == null) {
       throw new StageException(String.format("Missing attributes in event: %s. 
"
-          + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES 
(%s) |DataCache (%s)",
-          event, currentStateOutput, bestPossibleStateOutput, 
resourceToRebalance, cache));
+              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) 
|RESOURCES (%s) |DataCache (%s)",

Review comment:
       "BEST_POSSIBLE_STATE" is stale.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -33,182 +32,107 @@
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * For partition compute the Intermediate State (instance,state) pair based on 
the BestPossibleState
- * and CurrentState, with all constraints applied (such as state transition 
throttling).
- */
-public class IntermediateStateCalcStage extends AbstractBaseStage {
+
+public class PerReplicaThrottleStage extends AbstractBaseStage {
   private static final Logger logger =
-      LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
+      LoggerFactory.getLogger(PerReplicaThrottleStage.class.getName());
+
+  private boolean isEmitThrottledMsg = false;
+
+  public PerReplicaThrottleStage() {
+    this(false);
+  }
+
+  public PerReplicaThrottleStage(boolean enableEmitThrottledMsg) {
+    isEmitThrottledMsg = enableEmitThrottledMsg;
+  }
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
+
     CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
 
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    MessageOutput selectedMessages = 
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    LogUtil.logDebug(logger, _eventId, String.format("selectedMessages is: 
%s", selectedMessages));
+
     Map<String, Resource> resourceToRebalance =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     ResourceControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
 
-    if (currentStateOutput == null || bestPossibleStateOutput == null || 
resourceToRebalance == null
+    if (currentStateOutput == null || selectedMessages == null || 
resourceToRebalance == null
         || cache == null) {
       throw new StageException(String.format("Missing attributes in event: %s. 
"
-          + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES 
(%s) |DataCache (%s)",
-          event, currentStateOutput, bestPossibleStateOutput, 
resourceToRebalance, cache));
+              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) 
|RESOURCES (%s) |DataCache (%s)",
+          event, currentStateOutput, selectedMessages, resourceToRebalance, 
cache));
     }
 
-    IntermediateStateOutput intermediateStateOutput =
-        compute(event, resourceToRebalance, currentStateOutput, 
bestPossibleStateOutput);
-    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), 
intermediateStateOutput);
+    ResourcesStateMap retracedResourceStateMap = new ResourcesStateMap();
+    List<Message> throttledRecoveryMsg = new ArrayList<>();
+    List<Message> throttledLoadMsg = new ArrayList<>();
+    MessageOutput output =
+        compute(event, resourceToRebalance, currentStateOutput, 
selectedMessages, retracedResourceStateMap, throttledRecoveryMsg, 
throttledLoadMsg);
 
-    // Make sure no instance has more replicas/partitions assigned than 
maxPartitionPerInstance. If
-    // it does, pause the rebalance and put the cluster on maintenance mode
-    int maxPartitionPerInstance = 
cache.getClusterConfig().getMaxPartitionsPerInstance();
-    if (maxPartitionPerInstance > 0) {
-      validateMaxPartitionsPerInstance(event, cache, intermediateStateOutput,
-          maxPartitionPerInstance);
-    }
-  }
-
-  /**
-   * Go through each resource, and based on BestPossibleState and 
CurrentState, compute
-   * IntermediateState as close to BestPossibleState while maintaining 
throttling constraints (for
-   * example, ensure that the number of possible pending state transitions 
does NOT go over the set
-   * threshold).
-   * @param event
-   * @param resourceMap
-   * @param currentStateOutput
-   * @param bestPossibleStateOutput
-   * @return
-   */
-  private IntermediateStateOutput compute(ClusterEvent event, Map<String, 
Resource> resourceMap,
-      CurrentStateOutput currentStateOutput, BestPossibleStateOutput 
bestPossibleStateOutput) {
-    IntermediateStateOutput output = new IntermediateStateOutput();
-    ResourceControllerDataProvider dataCache =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
-
-    StateTransitionThrottleController throttleController = new 
StateTransitionThrottleController(
-        resourceMap.keySet(), dataCache.getClusterConfig(), 
dataCache.getLiveInstances().keySet());
-
-    // Resource level prioritization based on the numerical (sortable) 
priority field.
-    // If the resource priority field is null/not set, the resource will be 
treated as lowest
-    // priority.
-    List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
-    for (String resourceName : resourceMap.keySet()) {
-      prioritizedResourceList.add(new ResourcePriority(resourceName, 
Integer.MIN_VALUE));
-    }
-    // If resourcePriorityField is null at the cluster level, all resources 
will be considered equal
-    // in priority by keeping all priorities at MIN_VALUE
-    if (dataCache.getClusterConfig().getResourcePriorityField() != null) {
-      String priorityField = 
dataCache.getClusterConfig().getResourcePriorityField();
-      for (ResourcePriority resourcePriority : prioritizedResourceList) {
-        String resourceName = resourcePriority.getResourceName();
-
-        // Will take the priority from ResourceConfig first
-        // If ResourceConfig does not exist or does not have this field.
-        // Try to load it from the resource's IdealState. Otherwise, keep it 
at the lowest priority
-        if (dataCache.getResourceConfig(resourceName) != null
-            && 
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField) != 
null) {
-          resourcePriority.setPriority(
-              
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField));
-        } else if (dataCache.getIdealState(resourceName) != null && dataCache
-            
.getIdealState(resourceName).getRecord().getSimpleField(priorityField) != null) 
{
-          resourcePriority.setPriority(
-              
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
+    if (logger.isDebugEnabled()) {
+      LogUtil.logDebug(logger, _eventId, String.format("output is"));
+      for (String resource : resourceToRebalance.keySet()) {
+        if (output.getResourceMessages(resource) != null) {
+          LogUtil.logDebug(logger, _eventId, String.format("resource: %s", 
resource));
+          Map<Partition, List<Message>> partitionListMap = 
output.getResourceMessages(resource);
+          for (Partition partition : partitionListMap.keySet()) {
+            for (Message msg : partitionListMap.get(partition)) {
+              LogUtil.logDebug(logger, _eventId, String
+                  .format("\tresource: %s, partition: %s,  msg: %s", resource, 
partition, msg));
+            }
+          }
         }
       }
-      prioritizedResourceList.sort(new ResourcePriorityComparator());
     }
+    event.addAttribute(AttributeName.PER_REPLICA_THROTTLED_MESSAGES.name(), 
output);
+    LogUtil.logDebug(logger,_eventId, String.format("retraceResourceStateMap 
is: %s", retracedResourceStateMap));
+    event.addAttribute(AttributeName.PER_REPLICA_RETRACED_STATES.name(), 
retracedResourceStateMap);
 
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    List<String> failedResources = new ArrayList<>();
-
-    // Priority is applied in assignment computation because higher priority 
by looping in order of
-    // decreasing priority
-    for (ResourcePriority resourcePriority : prioritizedResourceList) {
-      String resourceName = resourcePriority.getResourceName();
-
-      if (!bestPossibleStateOutput.containsResource(resourceName)) {
-        LogUtil.logInfo(logger, _eventId, String.format(
-            "Skip calculating intermediate state for resource %s because the 
best possible state is not available.",
-            resourceName));
-        continue;
-      }
-
-      Resource resource = resourceMap.get(resourceName);
-      IdealState idealState = dataCache.getIdealState(resourceName);
-      if (idealState == null) {
-        // If IdealState is null, use an empty one
-        LogUtil.logInfo(logger, _eventId,
-            String.format(
-                "IdealState for resource %s does not exist; resource may not 
exist anymore",
-                resourceName));
-        idealState = new IdealState(resourceName);
-        idealState.setStateModelDefRef(resource.getStateModelDefRef());
-      }
-
-      try {
-        output.setState(resourceName,
-            computeIntermediatePartitionState(dataCache, clusterStatusMonitor, 
idealState,
-                resourceMap.get(resourceName), currentStateOutput,
-                bestPossibleStateOutput.getPartitionStateMap(resourceName),
-                bestPossibleStateOutput.getPreferenceLists(resourceName), 
throttleController));
-      } catch (HelixException ex) {
-        LogUtil.logInfo(logger, _eventId,
-            "Failed to calculate intermediate partition states for resource " 
+ resourceName, ex);
-        failedResources.add(resourceName);
-      }
+    if (isEmitThrottledMsg) {

Review comment:
       This does not sounds reasonable to me. Having this code in formal 
release is weird. And how do you plan to enable it? You still need a code 
change to check in right?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -396,447 +368,386 @@ private PartitionStateMap 
computeIntermediatePartitionState(ResourceControllerDa
     // Perform regular load balance only if the number of partitions in 
recovery and in error is
     // less than the threshold. Otherwise, only allow downward-transition load 
balance
     boolean onlyDownwardLoadBalance = partitionCount > threshold;
-
-    loadbalanceThrottledPartitions = loadRebalance(resource, 
currentStateOutput,
-        bestPossiblePartitionStateMap, throttleController, 
intermediatePartitionStateMap,
-        partitionsNeedLoadBalance, 
currentStateOutput.getCurrentStateMap(resourceName),
-        onlyDownwardLoadBalance, stateModelDef, cache);
-
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor.updateRebalancerStats(resourceName, 
partitionsNeedRecovery.size(),
-          partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(),
-          loadbalanceThrottledPartitions.size());
+    Set<Message> throttledLoadMessages = new HashSet<>();
+    LogUtil.logDebug(logger, _eventId,
+        String.format("applying load rebalance with resource %s, 
onlyDownwardLoadBalance %s",
+            resourceName, onlyDownwardLoadBalance));
+    applyThrottling(resource, throttleController, currentStateMap, 
bestPossibleMap, idealState,
+        cache, onlyDownwardLoadBalance, loadMessages, messagePartitionMap, 
throttledLoadMessages,
+        StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+
+    LogUtil.logDebug(logger, _eventId,
+        String.format("resource %s, throttled recovery message: %s", 
resourceName,throttledRecoveryMessages));
+    LogUtil.logDebug(logger, _eventId,
+        String.format("resource %s, throttled load messages: %s", 
resourceName,throttledLoadMessages));
+
+    throttledRecoveryMsgOut.addAll(throttledRecoveryMessages);
+    throttledLoadMessageOut.addAll(throttledLoadMessages);
+
+    // Step 5: construct output
+    Map<Partition, List<Message>> out = new HashMap<>();
+    for (Partition partition : resource.getPartitions()) {
+      List<Message> partitionMessages = 
selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+      List<Message> finalPartitionMessages = new ArrayList<>();
+      for (Message message: partitionMessages) {
+        if (throttledRecoveryMessages.contains(message)) {
+          continue;
+        }
+        if (throttledLoadMessages.contains(message)) {
+          continue;
+        }
+        finalPartitionMessages.add(message);
+      }
+      out.put(partition, finalPartitionMessages);
     }
 
-    if (logger.isDebugEnabled()) {
-      logPartitionMapState(resourceName, new 
HashSet<>(resource.getPartitions()),
-          partitionsNeedRecovery, recoveryThrottledPartitions, 
partitionsNeedLoadBalance,
-          loadbalanceThrottledPartitions, currentStateOutput, 
bestPossiblePartitionStateMap,
-          intermediatePartitionStateMap);
+    // Step 6: constructs all retraced partition state map for the resource;
+    for (Partition partition : resource.getPartitions()) {
+      List<Message> partitionMessages = out.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+      for (Message message : partitionMessages) {
+        if 
(!Message.MessageType.STATE_TRANSITION.name().equals(message.getMsgType())) {
+          // todo: log?
+          // ignore cancellation message etc.
+          continue;
+        }
+        String toState = message.getToState();
+        // toIntance may not be in the retracedStateMap as so far it is 
current state based.
+        // new instance in best possible not in currentstate would not be in 
retracedStateMap yet.
+        String toInstance = message.getTgtName();
+        Map<String, String> retracedStateMap = 
retracedPartitionsStateMap.get(partition);
+        retracedStateMap.put(toInstance, toState);
+      }
     }
-
-    LogUtil.logDebug(logger, _eventId, String.format("End processing resource: 
%s", resourceName));
-    return intermediatePartitionStateMap;
+    return out;
   }
 
-  /**
-   * Check for a partition, whether all transitions for its replicas are 
downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
-   */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> 
currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) 
{
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
+  private void getPartitionExpectedAndCurrentStateCountMap(
+      Partition partition,
+      Map<String, List<String>> preferenceLists,
+      IdealState idealState,
+      ResourceControllerDataProvider cache,
+      Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut,
+      Map<String, Integer> currentStateCountsOut
+  ) {
+    List<String> preferenceList = 
preferenceLists.get(partition.getPartitionName());
+    int replica = idealState.getMinActiveReplicas() == -1 ? idealState
+        .getReplicaCount(preferenceList.size()) : 
idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
 
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not 
strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > 
statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+    // For each state, check that this partition currently has the required 
number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDef
+        .getStateCountMap(activeList.size(), replica); // 
StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new 
HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
   }
 
-  /**
-   * Check and charge all pending transitions for throttling.
+  /*
+   * Charge pending messages with recovery or load rebalance and update the 
retraced partition map
+   * accordingly.
+   * Also update partitionsNeedRecovery, partitionsWithErrorStateReplica 
accordingly which is used
+   * by later steps.
    */
-  private void chargePendingTransition(Resource resource, CurrentStateOutput 
currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> 
partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider 
cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      PartitionStateMap intermediatePartitionStateMap) {
+  private void chargePendingMessages(Resource resource,
+      StateTransitionThrottleController throttleController,
+      CurrentStateOutput currentStateOutput,
+      BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState,
+      ResourceControllerDataProvider cache,
+      Set<Partition> partitionsNeedRecovery,
+      Set<Partition> partitionsWithErrorStateReplica,
+      Map<Partition, Map<String, String>> retracedPartitionsStateMap) {
+
+    logger.trace("throttleControllerstate->{} before pending message", 
throttleController);
     String resourceName = resource.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
 
-    // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
-      // Maps instance to its current state
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resourceName, partition);
-      // Maps instance to its pending (next) state
-      Map<String, String> pendingMap =
-          currentStateOutput.getPendingStateMap(resourceName, partition);
-
-      StateTransitionThrottleConfig.RebalanceType rebalanceType = 
RebalanceType.NONE;
-      if (partitionsNeedRecovery.contains(partition)) {
-        rebalanceType = 
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
-      } else if (partitionsNeedLoadbalance.contains(partition)) {
-        rebalanceType = 
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
-      }
-
-      if (pendingMap.size() > 0) {
-        boolean shouldChargePartition = false;
-        for (String instance : pendingMap.keySet()) {
-          String currentState = currentStateMap.get(instance);
-          String pendingState = pendingMap.get(instance);
-          if (pendingState != null && !pendingState.equals(currentState)
-              && !cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
-              .contains(instance)) {
-            // Only charge this instance if the partition is not disabled
-            throttleController.chargeInstance(rebalanceType, instance);
-            shouldChargePartition = true;
-            // If there is a pending state transition for the partition, that 
means that an assignment
-            // has already been made and the state transition message has 
already been sent out for the partition
-            // in a previous pipeline run. We must honor this and reflect it 
by charging for the pending state transition message.
-
-            // Since the assignment has already been made for the pending 
message, we do a special treatment
-            // for it by setting the best possible state directly in 
intermediatePartitionStateMap so that the pending
-            // message won't be double-assigned or double-charged in recovery 
or load balance.
-            handlePendingStateTransitionsForThrottling(partition, 
partitionsNeedRecovery,
-                partitionsNeedLoadbalance, rebalanceType, 
bestPossiblePartitionStateMap,
-                intermediatePartitionStateMap);
-          }
+      Map<String, String> retracedStateMap = new HashMap<>(currentStateMap);
+
+      if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) {
+        partitionsWithErrorStateReplica.add(partition);
+      }
+
+      Map<String, Integer> expectedStateCountMap = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, 
idealState,
+          cache, currentStateMap, expectedStateCountMap, currentStateCounts);
+
+      Map<String, Message> pendingMessageMap =
+          currentStateOutput.getPendingMessageMap(resourceName, partition);
+      List<Message> pendingMessages = new 
ArrayList<>(pendingMessageMap.values());
+      String stateModelDefName = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
+
+      // sort pendingMessages based on transition priority then timeStamp for 
state transition message
+      pendingMessages.sort(new PartitionMessageComparator(stateModelDef));
+      List<Message> recoveryMessages = new ArrayList<>();
+      List<Message> loadMessages = new ArrayList<>();
+      for (Message msg : pendingMessages) {
+        if 
(!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+          // ignore cancellation message etc. For now, don't charge them.
+          continue;
         }
-        if (shouldChargePartition) {
-          throttleController.chargeCluster(rebalanceType);
-          throttleController.chargeResource(rebalanceType, resourceName);
+        String toState = msg.getToState();
+        if (toState.equals(HelixDefinedState.DROPPED.name()) || toState
+            .equals(HelixDefinedState.ERROR.name())) {
+          continue;
+        }
+
+        Integer expectedCount = expectedStateCountMap.get(toState);
+        Integer currentCount = currentStateCounts.get(toState);
+        expectedCount = expectedCount == null ? 0 : expectedCount;
+        currentCount = currentCount == null ? 0 : currentCount;
+
+        boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+        // the gist is that if there is a topState, we should deem the 
topState also satisfy as secondTopState requirement.
+        // upward AND (condition 1 or condition 2)
+        // condition1: currentCount < expectedCount
+        // condition2: currentCount == expected && toState is secondary state 
&& currentCount(topState) < expectedCount(topState)
+        String topState = stateModelDef.getTopState();
+        String secondTopState = stateModelDef.getStatesPriorityList().get(1);
+        Integer expectedTopCount = expectedStateCountMap.get(topState);
+        Integer currentTopCount = currentStateCounts.get(topState);
+        currentTopCount = currentTopCount == null ? 0 : currentTopCount;
+        expectedTopCount = expectedTopCount == null ? 0 : expectedTopCount;
+
+        if (isUpward && ((currentCount < expectedCount) || (currentCount == 
expectedCount && toState
+            .equals(secondTopState) && currentTopCount < expectedTopCount))) {
+          recoveryMessages.add(msg);
+          partitionsNeedRecovery.add(partition);
+          // update
+          currentStateCounts.put(toState, currentCount + 1);
+        } else {
+          loadMessages.add(msg);
         }
       }
+      // charge recovery message and retrace
+      for (Message recoveryMsg : recoveryMessages) {
+        String toState = recoveryMsg.getToState();
+        String toInstance = recoveryMsg.getTgtName();
+        // toInstance should be in currentStateMap
+        retracedStateMap.put(toInstance, toState);
+
+        throttleController
+            
.chargeInstance(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+                toInstance);
+        throttleController
+            
.chargeCluster(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE);
+        throttleController
+            
.chargeResource(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+                resourceName);
+        logger.trace("throttleControllerstate->{} after pending recovery 
charge msg:{}", throttleController, recoveryMsg);
+      }
+      // charge load message and retrace;
+      // note if M->S with relay message, we don't charge relay message now. 
We would charge relay
+      // message only when it shows in pending messages in the next cycle of 
controller run.
+      for (Message loadMsg : loadMessages) {
+        String toState = loadMsg.getToState();
+        String toInstance = loadMsg.getTgtName();
+        retracedStateMap.put(toInstance, toState);
+
+        throttleController
+            
.chargeInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, 
toInstance);
+        
throttleController.chargeCluster(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+        throttleController
+            
.chargeResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, 
resourceName);
+        logger.trace("throttleControllerstate->{} after pending load charge 
msg:{}", throttleController, loadMsg);
+      }
+      retracedPartitionsStateMap.put(partition, retracedStateMap);
     }
-  }
-
-  /**
-   * Sort partitions according to partition priority {@link 
PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no 
throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedRecovery
-   * @param currentStateOutput
-   * @param topState
-   * @param cache
-   * @return a set of partitions that need recovery but did not get recovered 
due to throttling
-   */
-  private Set<Partition> recoveryRebalance(Resource resource,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      StateTransitionThrottleController throttleController,
-      PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedRecovery,
-      CurrentStateOutput currentStateOutput, String topState,
-      ResourceControllerDataProvider cache) {
-    String resourceName = resource.getResourceName();
-    Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
 
-    // Maps Partition -> Instance -> State
-    Map<Partition, Map<String, String>> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName);
-    List<Partition> partitionsNeedRecoveryPrioritized = new 
ArrayList<>(partitionsNeedRecovery);
-
-    // We want the result of the intermediate state calculation to be 
deterministic. We sort here by
-    // partition name to ensure that the order is consistent for inputs fed 
into
-    // PartitionPriorityComparator sort
-    
partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName));
-    partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, 
topState, true));
-
-    // For each partition, apply throttling if needed.
-    for (Partition partition : partitionsNeedRecoveryPrioritized) {
-      throttleStateTransitionsForPartition(throttleController, resourceName, 
partition,
-          currentStateOutput, bestPossiblePartitionStateMap, 
partitionRecoveryBalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, 
cache);
-    }
-    LogUtil.logInfo(logger, _eventId, String.format(
-        "For resource %s: Num of partitions needing recovery: %d, Num of 
partitions needing recovery"
-            + " but throttled (not recovered): %d",
-        resourceName, partitionsNeedRecovery.size(), 
partitionRecoveryBalanceThrottled.size()));
-    return partitionRecoveryBalanceThrottled;
   }
 
-  /**
-   * Sort partitions according to partition priority {@link 
PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no 
throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedLoadbalance
-   * @param currentStateMap
-   * @param onlyDownwardLoadBalance true when only allowing downward 
transitions
-   * @param stateModelDef for determining whether a partition's transitions 
are strictly downward
-   * @param cache
-   * @return
+  /*
+   * Classify the messages of each partition into recovery and load messages.
    */
-  private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput 
currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      StateTransitionThrottleController throttleController,
-      PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedLoadbalance,
-      Map<Partition, Map<String, String>> currentStateMap, boolean 
onlyDownwardLoadBalance,
-      StateModelDefinition stateModelDef, ResourceControllerDataProvider 
cache) {
+  private void classifyMessages(
+      Resource resource,
+      CurrentStateOutput currentStateOutput,
+      BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState,
+      ResourceControllerDataProvider cache,
+      Map<Partition, List<Message>> selectedResourceMessages,
+
+      List<Message> recoveryMessages,
+      List<Message> loadMessages,
+      Map<Message, Partition> messagePartitionMap
+  ) {
+
     String resourceName = resource.getResourceName();
-    Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    LogUtil.logInfo(logger, _eventId, String.format("Classify message for 
resource: %s", resourceName));
 
-    List<Partition> partitionsNeedLoadRebalancePrioritized =
-        new ArrayList<>(partitionsNeedLoadbalance);
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, Integer> expectedStateCountMap = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
 
-    // We want the result of the intermediate state calculation to be 
deterministic. We sort here by
-    // partition name to ensure that the order is consistent for inputs fed 
into
-    // PartitionPriorityComparator sort
-    
partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName));
-    partitionsNeedLoadRebalancePrioritized.sort(new 
PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", 
false));
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, 
idealState,
+          cache, currentStateMap, expectedStateCountMap, currentStateCounts);
 
-    for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
-      // If this is a downward load balance, check if the partition's 
transition is strictly
-      // downward
-      if (onlyDownwardLoadBalance) {
-        Map<String, String> currentStateMapForPartition =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        Map<String, String> bestPossibleMapForPartition =
-            bestPossiblePartitionStateMap.getPartitionMap(partition);
-        if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
-            bestPossibleMapForPartition, stateModelDef)) {
-          // For downward load balance, if a partition's transitions are not 
strictly downward,
-          // set currentState to intermediateState
-          intermediatePartitionStateMap.setState(partition, 
currentStateMapForPartition);
+      List<Message> partitionMessages = 
selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+
+      String stateModelDefName = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
+      // sort partitionMessages based on transition priority and then creation 
timestamp for transition message
+      partitionMessages.sort(new PartitionMessageComparator(stateModelDef));
+      Set<String> disabledInstances =
+          cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName());
+      for (Message msg : partitionMessages) {
+        if 
(!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId,
+                String.format("Message: %s not subject to throttle in 
resource: %s with type %s",
+                    msg, resourceName, msg.getMsgType()));
+          }
           continue;
         }
-      }
-      throttleStateTransitionsForPartition(throttleController, resourceName, 
partition,
-          currentStateOutput, bestPossiblePartitionStateMap, 
partitionsLoadbalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
-    }
-    LogUtil.logInfo(logger, _eventId,
-        String.format(
-            "For resource %s: Num of partitions needing load-balance: %d, Num 
of partitions needing"
-                + " load-balance but throttled (not load-balanced): %d",
-            resourceName, partitionsNeedLoadbalance.size(), 
partitionsLoadbalanceThrottled.size()));
-    return partitionsLoadbalanceThrottled;
-  }
 
-  /**
-   * Check the status on throttling at every level (cluster, resource, 
instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
-   */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String 
resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> 
partitionsThrottled,
-      PartitionStateMap intermediatePartitionStateMap, RebalanceType 
rebalanceType,
-      ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = 
bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
-    boolean hasReachedThrottlingLimit = false;
-    if (throttleController.shouldThrottleForResource(rebalanceType, 
resourceName)) {
-      hasReachedThrottlingLimit = true;
-      if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
-      }
-    } else {
-      // throttle if any of the instances are not able to accept state 
transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && 
!bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, 
instance)) {
-            hasReachedThrottlingLimit = true;
+        messagePartitionMap.put(msg, partition);
+        boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+
+        // for disabled disabled instance, the downward transition is not 
subjected to load throttling
+        // we will let them pass through ASAP.
+        String instance = msg.getTgtName();
+        if (disabledInstances.contains(instance)) {
+          if (!isUpward) {
             if (logger.isDebugEnabled()) {
               LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in 
resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
+                  String.format("Message: %s not subject to throttle in 
resource: %s to disabled instancce %s",
+                      msg, resourceName, instance));
             }
-            break;
+            continue;
           }
         }
-      }
-    }
-    if (!hasReachedThrottlingLimit) {
-      // This implies that there is room for more state transitions.
-      // Find instances with a replica whose current state is different from 
BestPossibleState and
-      // "charge" for it, and bestPossibleStates will become intermediate 
states
-      intermediateMap.putAll(bestPossibleMap);
-      boolean shouldChargeForPartition = false;
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && 
!bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
-                .contains(instance)) {
-          throttleController.chargeInstance(rebalanceType, instance);
-          shouldChargeForPartition = true;
+
+        String toState = msg.getToState();
+        if (toState.equals(HelixDefinedState.DROPPED.name()) || toState
+            .equals(HelixDefinedState.ERROR.name())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId,
+                String.format("Message: %s not subject to throttle in 
resource: %s with toState %s",
+                    msg, resourceName, toState));
+          }
+          continue;
         }
-      }
-      if (shouldChargeForPartition) {
-        throttleController.chargeCluster(rebalanceType);
-        throttleController.chargeResource(rebalanceType, resourceName);
-      }
-    } else {
-      // No more room for more state transitions; current states will just 
become intermediate
-      // states unless the partition is disabled
-      // Add this partition to a set of throttled partitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && 
!bestPossibleState.equals(currentState)
-            && cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
-                .contains(instance)) {
-          // Because this partition is disabled, we allow assignment
-          intermediateMap.put(instance, bestPossibleState);
+
+        Integer expectedCount = expectedStateCountMap.get(toState);
+        Integer currentCount = currentStateCounts.get(toState);
+        expectedCount = expectedCount == null ? 0 : expectedCount;
+        currentCount = currentCount == null ? 0 : currentCount;
+
+        String topState = stateModelDef.getTopState();
+        String secondTopState = stateModelDef.getStatesPriorityList().get(1);
+        Integer expectedTopCount = expectedStateCountMap.get(topState);
+        Integer currentTopCount = currentStateCounts.get(topState);
+        currentTopCount = currentTopCount == null ? 0 : currentTopCount;
+        expectedTopCount = expectedTopCount == null ? 0 : expectedTopCount;
+
+        if (isUpward && ((currentCount < expectedCount) || (currentCount == 
expectedCount && toState
+            .equals(secondTopState) && currentTopCount < expectedTopCount))) {
+          recoveryMessages.add(msg);
+          currentStateCounts.put(toState, currentCount + 1);
         } else {
-          // This partition is not disabled, so it must be throttled by just 
passing on the current
-          // state
-          if (currentState != null) {
-            intermediateMap.put(instance, currentState);
-          }
-          partitionsThrottled.add(partition);
+          loadMessages.add(msg);
         }
       }
     }
-    intermediatePartitionStateMap.setState(partition, intermediateMap);
   }
 
-  /**
-   * For a partition, given its preferenceList, bestPossibleState, and 
currentState, determine which
-   * type of rebalance is needed to model IdealState's states defined by the 
state model definition.
-   * @return RebalanceType needed to bring the replicas to idea states
-   *         RECOVERY_BALANCE - not all required states (replicas) are 
available through all
-   *         replicas, or the partition is disabled
-   *         NONE - current state matches the ideal state
-   *         LOAD_BALANCE - although all replicas required exist, Helix needs 
to optimize the
-   *         allocation
-   */
-  private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
-      Map<String, String> bestPossibleMap, List<String> preferenceList,
-      StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
-      IdealState idealState, String partitionName) {
-    if (preferenceList == null) {
-      preferenceList = Collections.emptyList();
+  private void applyThrottling(Resource resource,
+      StateTransitionThrottleController throttleController,
+      Map<Partition, Map<String, String>> currentStateMap,
+      Map<Partition, Map<String, String>> bestPossibleMap,
+      IdealState idealState,
+      ResourceControllerDataProvider cache,
+      boolean onlyDownwardLoadBalance,
+      List<Message> messages,
+      Map<Message, Partition> messagePartitionMap,
+      Set<Message> throttledMessages,
+      StateTransitionThrottleConfig.RebalanceType rebalanceType
+  ) {
+    boolean isRecovery = rebalanceType == 
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
+    if (isRecovery && onlyDownwardLoadBalance) {

Review comment:
       This check seems redundant. This function is only used internally, and 
when you call it, you know it won't happen.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to