junkaixue commented on code in PR #2546:
URL: https://github.com/apache/helix/pull/2546#discussion_r1248673105


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java:
##########
@@ -42,79 +49,102 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
 
   // Available Capacity per Instance
   private final Map<String, Map<String, Integer>> _instanceCapacityMap;
-  private final ResourceControllerDataProvider _cache;
+  private final Map<String, Map<String, Set<String>>> _allocatedPartitionsMap;
 
   public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
-    _cache = clusterData;
     _instanceCapacityMap = new HashMap<>();
-
-    ClusterConfig clusterConfig = _cache.getClusterConfig();
-    for (InstanceConfig instanceConfig : 
_cache.getInstanceConfigMap().values()) {
-      Map<String, Integer> instanceCapacity =
-        WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, 
instanceConfig);
+    _allocatedPartitionsMap = new HashMap<>();
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    for (InstanceConfig instanceConfig : 
clusterData.getInstanceConfigMap().values()) {
+      Map<String, Integer> instanceCapacity = 
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, 
instanceConfig);
       _instanceCapacityMap.put(instanceConfig.getInstanceName(), 
instanceCapacity);
+
+      _allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new 
HashMap<>());
     }
   }
 
-  /**
-   * Create Default Capacity Map.
-   * This is a utility method to create a default capacity map matching 
instance capacity map for participants.
-   * This is required as non-WAGED partitions will be placed on same instance 
and we don't know their actual capacity.
-   * This will generate default values of 0 for all the capacity keys.
-   */
-  private Map<String, Integer> createDefaultParticipantWeight() {
-    // copy the value of first Instance capacity.
-    Map<String, Integer> partCapacity = new 
HashMap<>(_instanceCapacityMap.values().iterator().next());
+  // Helper methods.
+  private boolean isPartitionInAllocatedMap(String instance, String resource, 
String partition) {
+    return _allocatedPartitionsMap.get(instance).containsKey(resource)
+        && 
_allocatedPartitionsMap.get(instance).get(resource).contains(partition);
+  }
 
-    // Set the value of all capacity to -1.
-    for (String key : partCapacity.keySet()) {
-      partCapacity.put(key, -1);
-    }
-    return partCapacity;
+  public void process(ResourceControllerDataProvider cache, CurrentStateOutput 
currentStateOutput,
+      Map<String, Resource> resourceMap, WagedResourceWeightsProvider 
weightProvider) {
+    processPendingMessages(cache, currentStateOutput, resourceMap, 
weightProvider);
+    processCurrentState(cache, currentStateOutput, resourceMap, 
weightProvider);
   }
 
   /**
    * Process the pending messages based on the Current states
    * @param currentState - Current state of the resources.
    */
-  public void processPendingMessages(CurrentStateOutput currentState) {
-    Map<String, Map<Partition, Map<String, Message>>> pendingMsgs = 
currentState.getPendingMessages();
-
-    for (String resource : pendingMsgs.keySet()) {
-      Map<Partition, Map<String, Message>> partitionMsgs = 
pendingMsgs.get(resource);
+  public void processPendingMessages(ResourceControllerDataProvider cache,
+      CurrentStateOutput currentState, Map<String, Resource> resourceMap,
+      WagedResourceWeightsProvider weightProvider) {
+
+    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+      String resName = resourceEntry.getKey();
+      Resource resource = resourceEntry.getValue();
+      // list of partitions in the resource
+      Collection<Partition> partitions = resource.getPartitions();
+      // State model definition for the resource
+      StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
+      if (stateModelDef == null) {
+        LOG.warn("State Model Definition for resource: " + resName + " is 
null");
+        continue;
+      }
+      Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
 
-      for (Partition partition : partitionMsgs.keySet()) {
+      for (Partition partition : partitions) {
         String partitionName = partition.getPartitionName();
-
         // Get Partition Weight
-        Map<String, Integer> partCapacity = getPartitionCapacity(resource, 
partitionName);
-
-        // TODO - check
-        Map<String, Message> msgs = partitionMsgs.get(partition);
-        // TODO - Check
-        for (String instance : msgs.keySet()) {
-           reduceAvailableInstanceCapacity(instance, partCapacity);
+        Map<String, Integer> partCapacity = 
weightProvider.getPartitionWeights(resName, partitionName);
+
+        // Get the pending messages for the partition
+        Map<String, Message> pendingMessages = 
currentState.getPendingMessageMap(resName, partition);
+        if (pendingMessages != null && !pendingMessages.isEmpty()) {
+          for (Map.Entry<String, Message> entry :  pendingMessages.entrySet()) 
{
+            String instance = entry.getKey();
+            if (isPartitionInAllocatedMap(instance, resName, partitionName)) {
+              continue;
+            }
+            Message msg = entry.getValue();
+            if (statePriorityMap.get(msg.getFromState()) < 
statePriorityMap.get(msg.getToState())
+                && msg.getToState().equals(stateModelDef.getInitialState())

Review Comment:
   I cannot remember but not sure whether this can cause double counting. 
Because when there is a OFFLINE -> STANDBY message, then the current state is 
already OFFLINE. So the capacity will be reduced by pending message and current 
states both.
   
   Because I remember the order of creating state transition with state model 
with OFFLINE state. But I cannot remember when we write it back to ZK...



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java:
##########
@@ -42,79 +49,102 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
 
   // Available Capacity per Instance
   private final Map<String, Map<String, Integer>> _instanceCapacityMap;
-  private final ResourceControllerDataProvider _cache;
+  private final Map<String, Map<String, Set<String>>> _allocatedPartitionsMap;
 
   public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
-    _cache = clusterData;
     _instanceCapacityMap = new HashMap<>();
-
-    ClusterConfig clusterConfig = _cache.getClusterConfig();
-    for (InstanceConfig instanceConfig : 
_cache.getInstanceConfigMap().values()) {
-      Map<String, Integer> instanceCapacity =
-        WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, 
instanceConfig);
+    _allocatedPartitionsMap = new HashMap<>();
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    for (InstanceConfig instanceConfig : 
clusterData.getInstanceConfigMap().values()) {
+      Map<String, Integer> instanceCapacity = 
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, 
instanceConfig);
       _instanceCapacityMap.put(instanceConfig.getInstanceName(), 
instanceCapacity);
+
+      _allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new 
HashMap<>());
     }
   }
 
-  /**
-   * Create Default Capacity Map.
-   * This is a utility method to create a default capacity map matching 
instance capacity map for participants.
-   * This is required as non-WAGED partitions will be placed on same instance 
and we don't know their actual capacity.
-   * This will generate default values of 0 for all the capacity keys.
-   */
-  private Map<String, Integer> createDefaultParticipantWeight() {
-    // copy the value of first Instance capacity.
-    Map<String, Integer> partCapacity = new 
HashMap<>(_instanceCapacityMap.values().iterator().next());
+  // Helper methods.
+  private boolean isPartitionInAllocatedMap(String instance, String resource, 
String partition) {
+    return _allocatedPartitionsMap.get(instance).containsKey(resource)
+        && 
_allocatedPartitionsMap.get(instance).get(resource).contains(partition);
+  }
 
-    // Set the value of all capacity to -1.
-    for (String key : partCapacity.keySet()) {
-      partCapacity.put(key, -1);
-    }
-    return partCapacity;
+  public void process(ResourceControllerDataProvider cache, CurrentStateOutput 
currentStateOutput,
+      Map<String, Resource> resourceMap, WagedResourceWeightsProvider 
weightProvider) {
+    processPendingMessages(cache, currentStateOutput, resourceMap, 
weightProvider);
+    processCurrentState(cache, currentStateOutput, resourceMap, 
weightProvider);

Review Comment:
   Would suggest change the order. Because current states represent the 
partitions already taken capacity. Logically pending message is ongoing taking 
capacity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to