junkaixue commented on code in PR #2546:
URL: https://github.com/apache/helix/pull/2546#discussion_r1248673105
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java:
##########
@@ -42,79 +49,102 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
// Available Capacity per Instance
private final Map<String, Map<String, Integer>> _instanceCapacityMap;
- private final ResourceControllerDataProvider _cache;
+ private final Map<String, Map<String, Set<String>>> _allocatedPartitionsMap;
public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
- _cache = clusterData;
_instanceCapacityMap = new HashMap<>();
-
- ClusterConfig clusterConfig = _cache.getClusterConfig();
- for (InstanceConfig instanceConfig :
_cache.getInstanceConfigMap().values()) {
- Map<String, Integer> instanceCapacity =
- WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig,
instanceConfig);
+ _allocatedPartitionsMap = new HashMap<>();
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ for (InstanceConfig instanceConfig :
clusterData.getInstanceConfigMap().values()) {
+ Map<String, Integer> instanceCapacity =
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig,
instanceConfig);
_instanceCapacityMap.put(instanceConfig.getInstanceName(),
instanceCapacity);
+
+ _allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new
HashMap<>());
}
}
- /**
- * Create Default Capacity Map.
- * This is a utility method to create a default capacity map matching
instance capacity map for participants.
- * This is required as non-WAGED partitions will be placed on same instance
and we don't know their actual capacity.
- * This will generate default values of 0 for all the capacity keys.
- */
- private Map<String, Integer> createDefaultParticipantWeight() {
- // copy the value of first Instance capacity.
- Map<String, Integer> partCapacity = new
HashMap<>(_instanceCapacityMap.values().iterator().next());
+ // Helper methods.
+ private boolean isPartitionInAllocatedMap(String instance, String resource,
String partition) {
+ return _allocatedPartitionsMap.get(instance).containsKey(resource)
+ &&
_allocatedPartitionsMap.get(instance).get(resource).contains(partition);
+ }
- // Set the value of all capacity to -1.
- for (String key : partCapacity.keySet()) {
- partCapacity.put(key, -1);
- }
- return partCapacity;
+ public void process(ResourceControllerDataProvider cache, CurrentStateOutput
currentStateOutput,
+ Map<String, Resource> resourceMap, WagedResourceWeightsProvider
weightProvider) {
+ processPendingMessages(cache, currentStateOutput, resourceMap,
weightProvider);
+ processCurrentState(cache, currentStateOutput, resourceMap,
weightProvider);
}
/**
* Process the pending messages based on the Current states
* @param currentState - Current state of the resources.
*/
- public void processPendingMessages(CurrentStateOutput currentState) {
- Map<String, Map<Partition, Map<String, Message>>> pendingMsgs =
currentState.getPendingMessages();
-
- for (String resource : pendingMsgs.keySet()) {
- Map<Partition, Map<String, Message>> partitionMsgs =
pendingMsgs.get(resource);
+ public void processPendingMessages(ResourceControllerDataProvider cache,
+ CurrentStateOutput currentState, Map<String, Resource> resourceMap,
+ WagedResourceWeightsProvider weightProvider) {
+
+ for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+ String resName = resourceEntry.getKey();
+ Resource resource = resourceEntry.getValue();
+ // list of partitions in the resource
+ Collection<Partition> partitions = resource.getPartitions();
+ // State model definition for the resource
+ StateModelDefinition stateModelDef =
cache.getStateModelDef(resource.getStateModelDefRef());
+ if (stateModelDef == null) {
+ LOG.warn("State Model Definition for resource: " + resName + " is
null");
+ continue;
+ }
+ Map<String, Integer> statePriorityMap =
stateModelDef.getStatePriorityMap();
- for (Partition partition : partitionMsgs.keySet()) {
+ for (Partition partition : partitions) {
String partitionName = partition.getPartitionName();
-
// Get Partition Weight
- Map<String, Integer> partCapacity = getPartitionCapacity(resource,
partitionName);
-
- // TODO - check
- Map<String, Message> msgs = partitionMsgs.get(partition);
- // TODO - Check
- for (String instance : msgs.keySet()) {
- reduceAvailableInstanceCapacity(instance, partCapacity);
+ Map<String, Integer> partCapacity =
weightProvider.getPartitionWeights(resName, partitionName);
+
+ // Get the pending messages for the partition
+ Map<String, Message> pendingMessages =
currentState.getPendingMessageMap(resName, partition);
+ if (pendingMessages != null && !pendingMessages.isEmpty()) {
+ for (Map.Entry<String, Message> entry : pendingMessages.entrySet())
{
+ String instance = entry.getKey();
+ if (isPartitionInAllocatedMap(instance, resName, partitionName)) {
+ continue;
+ }
+ Message msg = entry.getValue();
+ if (statePriorityMap.get(msg.getFromState()) <
statePriorityMap.get(msg.getToState())
+ && msg.getToState().equals(stateModelDef.getInitialState())
Review Comment:
I cannot remember but not sure whether this can cause double counting.
Because when there is a OFFLINE -> STANDBY message, then the current state is
already OFFLINE. So the capacity will be reduced by pending message and current
states both.
Because I remember the order of creating state transition with state model
with OFFLINE state. But I cannot remember when we write it back to ZK...
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java:
##########
@@ -42,79 +49,102 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
// Available Capacity per Instance
private final Map<String, Map<String, Integer>> _instanceCapacityMap;
- private final ResourceControllerDataProvider _cache;
+ private final Map<String, Map<String, Set<String>>> _allocatedPartitionsMap;
public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
- _cache = clusterData;
_instanceCapacityMap = new HashMap<>();
-
- ClusterConfig clusterConfig = _cache.getClusterConfig();
- for (InstanceConfig instanceConfig :
_cache.getInstanceConfigMap().values()) {
- Map<String, Integer> instanceCapacity =
- WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig,
instanceConfig);
+ _allocatedPartitionsMap = new HashMap<>();
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ for (InstanceConfig instanceConfig :
clusterData.getInstanceConfigMap().values()) {
+ Map<String, Integer> instanceCapacity =
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig,
instanceConfig);
_instanceCapacityMap.put(instanceConfig.getInstanceName(),
instanceCapacity);
+
+ _allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new
HashMap<>());
}
}
- /**
- * Create Default Capacity Map.
- * This is a utility method to create a default capacity map matching
instance capacity map for participants.
- * This is required as non-WAGED partitions will be placed on same instance
and we don't know their actual capacity.
- * This will generate default values of 0 for all the capacity keys.
- */
- private Map<String, Integer> createDefaultParticipantWeight() {
- // copy the value of first Instance capacity.
- Map<String, Integer> partCapacity = new
HashMap<>(_instanceCapacityMap.values().iterator().next());
+ // Helper methods.
+ private boolean isPartitionInAllocatedMap(String instance, String resource,
String partition) {
+ return _allocatedPartitionsMap.get(instance).containsKey(resource)
+ &&
_allocatedPartitionsMap.get(instance).get(resource).contains(partition);
+ }
- // Set the value of all capacity to -1.
- for (String key : partCapacity.keySet()) {
- partCapacity.put(key, -1);
- }
- return partCapacity;
+ public void process(ResourceControllerDataProvider cache, CurrentStateOutput
currentStateOutput,
+ Map<String, Resource> resourceMap, WagedResourceWeightsProvider
weightProvider) {
+ processPendingMessages(cache, currentStateOutput, resourceMap,
weightProvider);
+ processCurrentState(cache, currentStateOutput, resourceMap,
weightProvider);
Review Comment:
Would suggest change the order. Because current states represent the
partitions already taken capacity. Logically pending message is ongoing taking
capacity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]