[GitHub] [helix] pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long timeout) to TaskDriver.
pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long timeout) to TaskDriver. URL: https://github.com/apache/helix/pull/357#discussion_r307947633 ## File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ## @@ -115,14 +155,58 @@ public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor c this(admin, accessor, propertyStore, clusterName); } + public TaskDriver(HelixAdmin admin, +HelixDataAccessor accessor, +HelixPropertyStore propertyStore, +String clusterName) { +this(admin, accessor, propertyStore, clusterName, DEFAULT_POOL_SIZE); + } + public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, - HelixPropertyStore propertyStore, String clusterName) { + HelixPropertyStore propertyStore, String clusterName, int poolSize) { _admin = admin; _accessor = accessor; _propertyStore = propertyStore; _clusterName = clusterName; +_poolSize = poolSize; + } + + + /** + * Start TaskDriver's thread-pool. + */ + public void startPool() { Review comment: Has been thinking about this. It seems it might be better to give callers to pass in its own threadpool. The caller would have its ownership to create and shutdown the theadpool. And also set the threadpool size based on its needs. Also the TaskDriver constructor doesn't have to change parameters. Would offer below methods: ```java public ExecutorService getExecutorService() {} public void setExecutorService(ExecutorService pool) {} ``` @i3wangyi @jiajunwang @narendly How do you like this idea? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long timeout) to TaskDriver.
pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long timeout) to TaskDriver. URL: https://github.com/apache/helix/pull/357#discussion_r307947633 ## File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ## @@ -115,14 +155,58 @@ public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor c this(admin, accessor, propertyStore, clusterName); } + public TaskDriver(HelixAdmin admin, +HelixDataAccessor accessor, +HelixPropertyStore propertyStore, +String clusterName) { +this(admin, accessor, propertyStore, clusterName, DEFAULT_POOL_SIZE); + } + public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, - HelixPropertyStore propertyStore, String clusterName) { + HelixPropertyStore propertyStore, String clusterName, int poolSize) { _admin = admin; _accessor = accessor; _propertyStore = propertyStore; _clusterName = clusterName; +_poolSize = poolSize; + } + + + /** + * Start TaskDriver's thread-pool. + */ + public void startPool() { Review comment: Has been thinking about this. It seems it might be better to give callers to pass in its own threadpool. The caller would have its ownership to create and shutdown the theadpool. And also set the threadpool size based on its needs. Would offer below methods: ```java public ExecutorService getExecutorService() {} public void setExecutorService(ExecutorService pool) {} ``` @i3wangyi @jiajunwang @narendly How do you like this idea? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] kaisun2000 opened a new issue #364: Fix bugs in RoutingTableProvider metrics and enhance logs to record time spent
kaisun2000 opened a new issue #364: Fix bugs in RoutingTableProvider metrics and enhance logs to record time spent URL: https://github.com/apache/helix/issues/364 **Issues:** CurrentStateCache updating snapshot would miss all the existing partitions that having state change. RoutingTableProvider callback on the main event thread. Time is not accounted in log. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307943700 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterDataProvider.java ## @@ -0,0 +1,146 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The data provider generates the Cluster Model based on the controller's data cache. + */ +public class ClusterDataProvider { + /** + * @param dataProvider The controller's data cache. + * @param resourceMapThe full list of the resources to be rebalanced. Note that any + * resources that are not in this list will be removed from the + * final assignment. + * @param activeInstancesThe active instances that will be used in the calculation. + * Note this list can be different from the real active node list + * according to the rebalancer logic. + * @param clusterChanges All the cluster changes that happened after the previous rebalance. + * @param baselineAssignment The persisted Baseline assignment. + * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the + * previous rebalance. + * @return The cluster model as the input for the upcoming rebalance. + */ + public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider, + Map resourceMap, Set activeInstances, + Map> clusterChanges, + Map baselineAssignment, Map bestPossibleAssignment) { +// Initialize all assignable replicas according to the input +Set assignableReplicas = +generateAssignableReplicas(dataProvider, resourceMap, activeInstances.size(), +bestPossibleAssignment, clusterChanges); + +// TODO split the to-be-assigned replications to separate list. +Map> assignedReplicaMap = new HashMap<>(); + +// Construct cluster context. +ClusterContext context = new ClusterContext(assignableReplicas, activeInstances.size()); + +// Construct all the assignble nodes and initialize with the confirmed assignment. +Set assignableNodes = activeInstances.stream().map( +instanceName -> new AssignableNode(dataProvider, instanceName, +assignedReplicaMap.getOrDefault(instanceName, Collections.emptySet( +.collect(Collectors.toSet()); + +// Initial the cluster context object with the confirmed assignments. +Map>> assignmentForFaultZoneMap = new HashMap<>(); +assignableNodes.stream() +.forEach(node -> addNodeAssignmentsToFaultZone(node, assignmentForFaultZoneMap)); +context.setAssignmentForFaultZoneMap(assignmentForFaultZoneMap); + +return new ClusterModel(context, assignableReplicas, assignableNodes, baselineAssignment, +bestPossibleAssignment); + } + + /** + * Find all the replications that need to be reallocated. + * + * @param dataProvider The cluster status cache that contains the current cluster status. + * @param resourceMapAll the valid resources that are managed by the rebalancer. + * @param bestPossibleAssignment The persisted Best Possible State. + * @param clusterChanges All the cluster changes that happened after the previous rebalance. + * @return A set of assignable replications that need reallocation. + */ + private static Set
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307940906 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java ## @@ -19,10 +19,290 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { + private static final Logger _logger = LoggerFactory.getLogger(AssignableNode.class.getName()); + + // basic node information + private final String _instanceName; + private Set _instanceTags; + private String _faultZone; + private Map> _disabledPartitionsMap; + private Map _maxCapacity; + private int _maxPartition; + + // proposed assignment tracking + // + private Map> _currentAssignments; + // + private Map> _currentTopStateAssignments; + // + private Map _currentCapacity; + // runtime usage tracking + private int _totalReplicaAssignmentCount; + private float _highestCapacityUtilization; + + AssignableNode(ResourceControllerDataProvider clusterCache, String instanceName, + Collection existingAssignment) { +_instanceName = instanceName; +refresh(clusterCache, existingAssignment); + } + + private void reset() { +_currentAssignments = new HashMap<>(); +_currentTopStateAssignments = new HashMap<>(); +_currentCapacity = new HashMap<>(); +_totalReplicaAssignmentCount = 0; +_highestCapacityUtilization = 0; + } + + /** + * Update the node with a ClusterDataCache. This resets the current assignment and recalculate currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * subject to changes. If the assumption is no longer true, this function should become private. + * + * @param clusterCache - the current cluster cache to initial the AssignableNode. + */ + private void refresh(ResourceControllerDataProvider clusterCache, + Collection existingAssignment) { +reset(); + +InstanceConfig instanceConfig = clusterCache.getInstanceConfigMap().get(_instanceName); +ClusterConfig clusterConfig = clusterCache.getClusterConfig(); + +_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); +_faultZone = computeFaultZone(clusterConfig, instanceConfig); +_instanceTags = new HashSet<>(instanceConfig.getTags()); +_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); +_maxCapacity = instanceConfig.getInstanceCapacityMap(); +_maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + +assignNewBatch(existingAssignment); + } + + /** + * Assign a replica to the node. + * + * @param assignableReplica - the replica to be assigned + */ + void assign(AssignableReplica assignableReplica) { +if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) { + throw new HelixException(String + .format("Resource {} already has a replica from partition {} on this node", + assignableReplica.getResourceName(), assignableReplica.getPartitionName())); +} else { + if (assignableReplica.isReplicaTopState()) { +addToAssignmentRecord(assignableReplica, _currentTopStateAssignments); + } + _totalReplicaAssignmentCount += 1; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), entry.getValue())); +} + } + + /** + * Release a replica from the node. + * If the replication is not on this node, the assignable node is not updated. + * + * @param assignableReplica - the replica to be released + */ + void release(AssignableReplica assignableReplica) throws IllegalArgumentException { +String resourceName = assignableReplica.getResourceName(); +String partitionName = assignableReplica.getPartitionName(); +if (!_currentAssignments.containsKey(resourceName)) { + _logger.warn("Resource " + resourceName +
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307940435 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java ## @@ -19,10 +19,290 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { + private static final Logger _logger = LoggerFactory.getLogger(AssignableNode.class.getName()); + + // basic node information + private final String _instanceName; + private Set _instanceTags; + private String _faultZone; + private Map> _disabledPartitionsMap; + private Map _maxCapacity; + private int _maxPartition; + + // proposed assignment tracking + // + private Map> _currentAssignments; + // + private Map> _currentTopStateAssignments; + // + private Map _currentCapacity; + // runtime usage tracking + private int _totalReplicaAssignmentCount; + private float _highestCapacityUtilization; + + AssignableNode(ResourceControllerDataProvider clusterCache, String instanceName, + Collection existingAssignment) { +_instanceName = instanceName; +refresh(clusterCache, existingAssignment); + } + + private void reset() { +_currentAssignments = new HashMap<>(); +_currentTopStateAssignments = new HashMap<>(); +_currentCapacity = new HashMap<>(); +_totalReplicaAssignmentCount = 0; +_highestCapacityUtilization = 0; + } + + /** + * Update the node with a ClusterDataCache. This resets the current assignment and recalculate currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * subject to changes. If the assumption is no longer true, this function should become private. + * + * @param clusterCache - the current cluster cache to initial the AssignableNode. + */ + private void refresh(ResourceControllerDataProvider clusterCache, + Collection existingAssignment) { +reset(); + +InstanceConfig instanceConfig = clusterCache.getInstanceConfigMap().get(_instanceName); +ClusterConfig clusterConfig = clusterCache.getClusterConfig(); + +_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); +_faultZone = computeFaultZone(clusterConfig, instanceConfig); +_instanceTags = new HashSet<>(instanceConfig.getTags()); +_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); +_maxCapacity = instanceConfig.getInstanceCapacityMap(); +_maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + +assignNewBatch(existingAssignment); + } + + /** + * Assign a replica to the node. + * + * @param assignableReplica - the replica to be assigned + */ + void assign(AssignableReplica assignableReplica) { +if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) { + throw new HelixException(String + .format("Resource {} already has a replica from partition {} on this node", + assignableReplica.getResourceName(), assignableReplica.getPartitionName())); +} else { + if (assignableReplica.isReplicaTopState()) { +addToAssignmentRecord(assignableReplica, _currentTopStateAssignments); + } + _totalReplicaAssignmentCount += 1; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), entry.getValue())); +} + } + + /** + * Release a replica from the node. + * If the replication is not on this node, the assignable node is not updated. + * + * @param assignableReplica - the replica to be released + */ + void release(AssignableReplica assignableReplica) throws IllegalArgumentException { +String resourceName = assignableReplica.getResourceName(); +String partitionName = assignableReplica.getPartitionName(); +if (!_currentAssignments.containsKey(resourceName)) { + _logger.warn("Resource " + resourceName +
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307940965 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java ## @@ -19,10 +19,290 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { + private static final Logger _logger = LoggerFactory.getLogger(AssignableNode.class.getName()); + + // basic node information + private final String _instanceName; + private Set _instanceTags; + private String _faultZone; + private Map> _disabledPartitionsMap; + private Map _maxCapacity; + private int _maxPartition; + + // proposed assignment tracking + // + private Map> _currentAssignments; + // + private Map> _currentTopStateAssignments; + // + private Map _currentCapacity; + // runtime usage tracking + private int _totalReplicaAssignmentCount; + private float _highestCapacityUtilization; + + AssignableNode(ResourceControllerDataProvider clusterCache, String instanceName, + Collection existingAssignment) { +_instanceName = instanceName; +refresh(clusterCache, existingAssignment); + } + + private void reset() { +_currentAssignments = new HashMap<>(); +_currentTopStateAssignments = new HashMap<>(); +_currentCapacity = new HashMap<>(); +_totalReplicaAssignmentCount = 0; +_highestCapacityUtilization = 0; + } + + /** + * Update the node with a ClusterDataCache. This resets the current assignment and recalculate currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * subject to changes. If the assumption is no longer true, this function should become private. + * + * @param clusterCache - the current cluster cache to initial the AssignableNode. + */ + private void refresh(ResourceControllerDataProvider clusterCache, + Collection existingAssignment) { +reset(); + +InstanceConfig instanceConfig = clusterCache.getInstanceConfigMap().get(_instanceName); +ClusterConfig clusterConfig = clusterCache.getClusterConfig(); + +_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); +_faultZone = computeFaultZone(clusterConfig, instanceConfig); +_instanceTags = new HashSet<>(instanceConfig.getTags()); +_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); +_maxCapacity = instanceConfig.getInstanceCapacityMap(); +_maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + +assignNewBatch(existingAssignment); + } + + /** + * Assign a replica to the node. + * + * @param assignableReplica - the replica to be assigned + */ + void assign(AssignableReplica assignableReplica) { +if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) { + throw new HelixException(String + .format("Resource {} already has a replica from partition {} on this node", + assignableReplica.getResourceName(), assignableReplica.getPartitionName())); +} else { + if (assignableReplica.isReplicaTopState()) { +addToAssignmentRecord(assignableReplica, _currentTopStateAssignments); + } + _totalReplicaAssignmentCount += 1; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), entry.getValue())); +} + } + + /** + * Release a replica from the node. + * If the replication is not on this node, the assignable node is not updated. + * + * @param assignableReplica - the replica to be released + */ + void release(AssignableReplica assignableReplica) throws IllegalArgumentException { +String resourceName = assignableReplica.getResourceName(); +String partitionName = assignableReplica.getPartitionName(); +if (!_currentAssignments.containsKey(resourceName)) { + _logger.warn("Resource " + resourceName +
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307943962 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java ## @@ -19,9 +19,129 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.model.IdealState; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * A placeholder before we have the implementation. - * * This class wraps the required input for the rebalance algorithm. */ -public class ClusterModel { } +public class ClusterModel { + private final ClusterContext _clusterContext; + // >> + private final Map> _assignableReplicaIndex; + private final Map _assignableNodeMap; + + // Records about the previous assignment + private final Map _baselineAssignment; + private final Map _bestPossibleAssignment; + + /** + * @param clusterContext The initialized cluster context. + * @param assignableReplicas The replications to be assigned. + * Note that the replicas in this list shall not be included while initializing the context and assignable nodes. + * @param assignableNodesThe active instances. + * @param baselineAssignment The recorded baseline assignment. + * @param bestPossibleAssignment The current best possible assignment. + */ + ClusterModel(ClusterContext clusterContext, Set assignableReplicas, + Set assignableNodes, Map baselineAssignment, + Map bestPossibleAssignment) { +_clusterContext = clusterContext; + +// Index all the replicas to be assigned. +_assignableReplicaIndex = new HashMap<>(); +assignableReplicas.stream().forEach(rep -> { + Map replicaMap = + _assignableReplicaIndex.getOrDefault(rep.getResourceName(), new HashMap<>()); + replicaMap.put(rep.getReplicaKey(), rep); Review comment: `computeIfAbsent`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307935935 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java ## @@ -19,9 +19,124 @@ * under the License. */ +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + +import java.io.IOException; +import java.util.Map; + /** - * A placeholder before we have the implementation. - * * This class represents a partition replication that needs to be allocated. */ -public class AssignableReplica { } +public class AssignableReplica implements Comparable { + private final String _partitionName; + private final String _resourceName; + private final String _resourceInstanceGroupTag; + private final int _resourceMaxPartitionsPerInstance; + private final Map _capacityUsage; + // The priority of the replica's state + private final int _statePriority; + // The state of the replica + private final String _replicaState; + + /** + * @param resourceConfig The resource config for the resource which contains the replication. + * @param partitionName The replication's partition name. + * @param replicaState The state of the replication. + * @param statePriority The priority of the replication's state. + */ + AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState, + int statePriority) { +_partitionName = partitionName; +_replicaState = replicaState; +_statePriority = statePriority; +_resourceName = resourceConfig.getResourceName(); +_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig); +_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag(); +_resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance(); + } + + public Map getCapacity() { +return _capacityUsage; + } + + public String getPartitionName() { +return _partitionName; + } + + public String getReplicaState() { +return _replicaState; + } + + public boolean isReplicaTopState() { +return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY; + } + + public int getStatePriority() { +return _statePriority; + } + + public String getResourceName() { +return _resourceName; + } + + public String getResourceInstanceGroupTag() { +return _resourceInstanceGroupTag; + } + + public Integer getResourceMaxPartitionsPerInstance() { Review comment: Checked callers and assignments which are all using `int`, it seems there is no need to return an `Integer` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307938009 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java ## @@ -19,9 +19,104 @@ * under the License. */ +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * A placeholder before we have the implementation. - * - * This class tracks the global rebalance-related status of a Helix managed cluster. + * This class tracks the rebalance-related global cluster status. */ -public class ClusterContext { } +public class ClusterContext { + private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f; + + // This estimation helps to ensure global partition count evenness + private final int _estimatedMaxPartitionCount; + // This estimation helps to ensure global top state replica count evenness + private final int _estimatedMaxTopStateCount; + // This estimation helps to ensure per-resource partition count evenness + private final Map _estimatedMaxPartitionByResource = new HashMap<>(); + + // map{zoneName : map{resourceName : set(partitionNames)}} + private Map>> _assignmentForFaultZoneMap = new HashMap<>(); + + /** + * Construct the cluster context based on the current instance status. + * + * @param replicaSetAll the partition replicas that are managed by the rebalancer + * @param instanceCount The count of all the active instances that can be used to host partitions. + */ + ClusterContext(Set replicaSet, int instanceCount) { +int totalReplicas = 0; +int totalTopStateReplicas = 0; + +for (Map.Entry> entry : replicaSet.stream() + .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) { + int replicas = entry.getValue().size(); + totalReplicas += replicas; + + int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount)); + _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt); + + totalTopStateReplicas += + entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count(); +} + +_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount); +_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount); + } + + public Map>> getAssignmentForFaultZoneMap() { +return _assignmentForFaultZoneMap; + } + + public Integer getEstimatedMaxPartitionCount() { +return _estimatedMaxPartitionCount; + } + + public Integer getEstimatedMaxPartitionByResource(String resourceName) { +return _estimatedMaxPartitionByResource.get(resourceName); + } + + public Integer getEstimatedMaxTopStateCount() { Review comment: Same. Return `int`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307939730 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java ## @@ -19,9 +19,104 @@ * under the License. */ +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * A placeholder before we have the implementation. - * - * This class tracks the global rebalance-related status of a Helix managed cluster. + * This class tracks the rebalance-related global cluster status. */ -public class ClusterContext { } +public class ClusterContext { + private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f; + + // This estimation helps to ensure global partition count evenness + private final int _estimatedMaxPartitionCount; + // This estimation helps to ensure global top state replica count evenness + private final int _estimatedMaxTopStateCount; + // This estimation helps to ensure per-resource partition count evenness + private final Map _estimatedMaxPartitionByResource = new HashMap<>(); + + // map{zoneName : map{resourceName : set(partitionNames)}} + private Map>> _assignmentForFaultZoneMap = new HashMap<>(); + + /** + * Construct the cluster context based on the current instance status. + * + * @param replicaSetAll the partition replicas that are managed by the rebalancer + * @param instanceCount The count of all the active instances that can be used to host partitions. + */ + ClusterContext(Set replicaSet, int instanceCount) { +int totalReplicas = 0; +int totalTopStateReplicas = 0; + +for (Map.Entry> entry : replicaSet.stream() + .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) { + int replicas = entry.getValue().size(); + totalReplicas += replicas; + + int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount)); + _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt); + + totalTopStateReplicas += + entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count(); +} + +_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount); +_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount); + } + + public Map>> getAssignmentForFaultZoneMap() { +return _assignmentForFaultZoneMap; + } + + public Integer getEstimatedMaxPartitionCount() { +return _estimatedMaxPartitionCount; + } + + public Integer getEstimatedMaxPartitionByResource(String resourceName) { +return _estimatedMaxPartitionByResource.get(resourceName); + } + + public Integer getEstimatedMaxTopStateCount() { +return _estimatedMaxTopStateCount; + } + + public Set getPartitionsForResourceAndFaultZone(String faultZoneId, String resourceName) { +if (_assignmentForFaultZoneMap.containsKey(faultZoneId) && _assignmentForFaultZoneMap +.get(faultZoneId).containsKey(resourceName)) { + return _assignmentForFaultZoneMap.get(faultZoneId).get(resourceName); +} +return Collections.emptySet(); + } + + void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) { +if (!_assignmentForFaultZoneMap.containsKey(faultZoneId)) { + _assignmentForFaultZoneMap.put(faultZoneId, new HashMap<>()); +} +if (!_assignmentForFaultZoneMap.get(faultZoneId).containsKey(resourceName)) { + _assignmentForFaultZoneMap.get(faultZoneId).put(resourceName, new HashSet<>()); +} + _assignmentForFaultZoneMap.get(faultZoneId).get(resourceName).add(partition); Review comment: For this code block, would it look cleaner if we `computeIfAbsent()`? ```java _assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>()) .computeIfAbsent(resourceName, k -> new HashSet<>()) .add(partition); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307937930 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java ## @@ -19,9 +19,104 @@ * under the License. */ +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * A placeholder before we have the implementation. - * - * This class tracks the global rebalance-related status of a Helix managed cluster. + * This class tracks the rebalance-related global cluster status. */ -public class ClusterContext { } +public class ClusterContext { + private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f; + + // This estimation helps to ensure global partition count evenness + private final int _estimatedMaxPartitionCount; + // This estimation helps to ensure global top state replica count evenness + private final int _estimatedMaxTopStateCount; + // This estimation helps to ensure per-resource partition count evenness + private final Map _estimatedMaxPartitionByResource = new HashMap<>(); + + // map{zoneName : map{resourceName : set(partitionNames)}} + private Map>> _assignmentForFaultZoneMap = new HashMap<>(); + + /** + * Construct the cluster context based on the current instance status. + * + * @param replicaSetAll the partition replicas that are managed by the rebalancer + * @param instanceCount The count of all the active instances that can be used to host partitions. + */ + ClusterContext(Set replicaSet, int instanceCount) { +int totalReplicas = 0; +int totalTopStateReplicas = 0; + +for (Map.Entry> entry : replicaSet.stream() + .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) { + int replicas = entry.getValue().size(); + totalReplicas += replicas; + + int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount)); + _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt); + + totalTopStateReplicas += + entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count(); +} + +_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount); +_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount); + } + + public Map>> getAssignmentForFaultZoneMap() { +return _assignmentForFaultZoneMap; + } + + public Integer getEstimatedMaxPartitionCount() { Review comment: Return `int`? No need to autobox into Integer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307936299 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java ## @@ -19,9 +19,124 @@ * under the License. */ +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + +import java.io.IOException; +import java.util.Map; + /** - * A placeholder before we have the implementation. - * * This class represents a partition replication that needs to be allocated. */ -public class AssignableReplica { } +public class AssignableReplica implements Comparable { + private final String _partitionName; + private final String _resourceName; + private final String _resourceInstanceGroupTag; + private final int _resourceMaxPartitionsPerInstance; + private final Map _capacityUsage; + // The priority of the replica's state + private final int _statePriority; + // The state of the replica + private final String _replicaState; + + /** + * @param resourceConfig The resource config for the resource which contains the replication. + * @param partitionName The replication's partition name. + * @param replicaState The state of the replication. + * @param statePriority The priority of the replication's state. + */ + AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState, + int statePriority) { +_partitionName = partitionName; +_replicaState = replicaState; +_statePriority = statePriority; +_resourceName = resourceConfig.getResourceName(); +_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig); +_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag(); +_resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance(); + } + + public Map getCapacity() { +return _capacityUsage; + } + + public String getPartitionName() { +return _partitionName; + } + + public String getReplicaState() { +return _replicaState; + } + + public boolean isReplicaTopState() { +return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY; + } + + public int getStatePriority() { +return _statePriority; + } + + public String getResourceName() { +return _resourceName; + } + + public String getResourceInstanceGroupTag() { +return _resourceInstanceGroupTag; + } + + public Integer getResourceMaxPartitionsPerInstance() { +return _resourceMaxPartitionsPerInstance; + } + + public String getReplicaKey() { +return generateReplicaKey(_resourceName, _partitionName, _replicaState); + } + + @Override + public int compareTo(AssignableReplica replica) { +if (!_resourceName.equals(replica._resourceName)) { + return _resourceName.compareTo(replica._resourceName); +} +if (!_partitionName.equals(replica._partitionName)) { + return _partitionName.compareTo(replica._partitionName); +} +if (!_replicaState.equals(replica._replicaState)) { + return _replicaState.compareTo(replica._replicaState); +} +return 0; + } + + public static String generateReplicaKey(String resourceName, String partitioName, String state) { +return String.format("%s-%s-%s", resourceName, partitioName, state); + } + + /** + * Parse the resource config for the partition weight. + */ + private Map fetchCapacityUsage(String partitionName, + ResourceConfig resourceConfig) { +Map> capacityMap; + +try { + capacityMap = resourceConfig.getPartitionCapacityMap(); +} catch (IOException ex) { + throw new IllegalArgumentException("Invalid partition capacity configuration.", ex); +} + +Map partitionCapacity = capacityMap.get(partitionName); +if (partitionCapacity == null) { + partitionCapacity.get(ResourceConfig.DEFAULT_PARTITION_KEY); Review comment: Typo? What does this do? If partitionCapacity is null, this would throw a NullPointerException. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307936299 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java ## @@ -19,9 +19,124 @@ * under the License. */ +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + +import java.io.IOException; +import java.util.Map; + /** - * A placeholder before we have the implementation. - * * This class represents a partition replication that needs to be allocated. */ -public class AssignableReplica { } +public class AssignableReplica implements Comparable { + private final String _partitionName; + private final String _resourceName; + private final String _resourceInstanceGroupTag; + private final int _resourceMaxPartitionsPerInstance; + private final Map _capacityUsage; + // The priority of the replica's state + private final int _statePriority; + // The state of the replica + private final String _replicaState; + + /** + * @param resourceConfig The resource config for the resource which contains the replication. + * @param partitionName The replication's partition name. + * @param replicaState The state of the replication. + * @param statePriority The priority of the replication's state. + */ + AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState, + int statePriority) { +_partitionName = partitionName; +_replicaState = replicaState; +_statePriority = statePriority; +_resourceName = resourceConfig.getResourceName(); +_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig); +_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag(); +_resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance(); + } + + public Map getCapacity() { +return _capacityUsage; + } + + public String getPartitionName() { +return _partitionName; + } + + public String getReplicaState() { +return _replicaState; + } + + public boolean isReplicaTopState() { +return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY; + } + + public int getStatePriority() { +return _statePriority; + } + + public String getResourceName() { +return _resourceName; + } + + public String getResourceInstanceGroupTag() { +return _resourceInstanceGroupTag; + } + + public Integer getResourceMaxPartitionsPerInstance() { +return _resourceMaxPartitionsPerInstance; + } + + public String getReplicaKey() { +return generateReplicaKey(_resourceName, _partitionName, _replicaState); + } + + @Override + public int compareTo(AssignableReplica replica) { +if (!_resourceName.equals(replica._resourceName)) { + return _resourceName.compareTo(replica._resourceName); +} +if (!_partitionName.equals(replica._partitionName)) { + return _partitionName.compareTo(replica._partitionName); +} +if (!_replicaState.equals(replica._replicaState)) { + return _replicaState.compareTo(replica._replicaState); +} +return 0; + } + + public static String generateReplicaKey(String resourceName, String partitioName, String state) { +return String.format("%s-%s-%s", resourceName, partitioName, state); + } + + /** + * Parse the resource config for the partition weight. + */ + private Map fetchCapacityUsage(String partitionName, + ResourceConfig resourceConfig) { +Map> capacityMap; + +try { + capacityMap = resourceConfig.getPartitionCapacityMap(); +} catch (IOException ex) { + throw new IllegalArgumentException("Invalid partition capacity configuration.", ex); +} + +Map partitionCapacity = capacityMap.get(partitionName); +if (partitionCapacity == null) { + partitionCapacity.get(ResourceConfig.DEFAULT_PARTITION_KEY); Review comment: Typo? What does this do? If partitionCapacity is null, this would throw a NullPointException. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307935935 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java ## @@ -19,9 +19,124 @@ * under the License. */ +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; + +import java.io.IOException; +import java.util.Map; + /** - * A placeholder before we have the implementation. - * * This class represents a partition replication that needs to be allocated. */ -public class AssignableReplica { } +public class AssignableReplica implements Comparable { + private final String _partitionName; + private final String _resourceName; + private final String _resourceInstanceGroupTag; + private final int _resourceMaxPartitionsPerInstance; + private final Map _capacityUsage; + // The priority of the replica's state + private final int _statePriority; + // The state of the replica + private final String _replicaState; + + /** + * @param resourceConfig The resource config for the resource which contains the replication. + * @param partitionName The replication's partition name. + * @param replicaState The state of the replication. + * @param statePriority The priority of the replication's state. + */ + AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState, + int statePriority) { +_partitionName = partitionName; +_replicaState = replicaState; +_statePriority = statePriority; +_resourceName = resourceConfig.getResourceName(); +_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig); +_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag(); +_resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance(); + } + + public Map getCapacity() { +return _capacityUsage; + } + + public String getPartitionName() { +return _partitionName; + } + + public String getReplicaState() { +return _replicaState; + } + + public boolean isReplicaTopState() { +return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY; + } + + public int getStatePriority() { +return _statePriority; + } + + public String getResourceName() { +return _resourceName; + } + + public String getResourceInstanceGroupTag() { +return _resourceInstanceGroupTag; + } + + public Integer getResourceMaxPartitionsPerInstance() { Review comment: Checked callers and assignments, it seems there is no need to return an `Integer`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307933787 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java ## @@ -19,10 +19,290 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { + private static final Logger _logger = LoggerFactory.getLogger(AssignableNode.class.getName()); + + // basic node information + private final String _instanceName; + private Set _instanceTags; + private String _faultZone; + private Map> _disabledPartitionsMap; + private Map _maxCapacity; + private int _maxPartition; + + // proposed assignment tracking + // + private Map> _currentAssignments; + // + private Map> _currentTopStateAssignments; + // + private Map _currentCapacity; + // runtime usage tracking + private int _totalReplicaAssignmentCount; + private float _highestCapacityUtilization; + + AssignableNode(ResourceControllerDataProvider clusterCache, String instanceName, + Collection existingAssignment) { +_instanceName = instanceName; +refresh(clusterCache, existingAssignment); + } + + private void reset() { +_currentAssignments = new HashMap<>(); +_currentTopStateAssignments = new HashMap<>(); +_currentCapacity = new HashMap<>(); +_totalReplicaAssignmentCount = 0; +_highestCapacityUtilization = 0; + } + + /** + * Update the node with a ClusterDataCache. This resets the current assignment and recalculate currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * subject to changes. If the assumption is no longer true, this function should become private. + * + * @param clusterCache - the current cluster cache to initial the AssignableNode. + */ + private void refresh(ResourceControllerDataProvider clusterCache, + Collection existingAssignment) { +reset(); + +InstanceConfig instanceConfig = clusterCache.getInstanceConfigMap().get(_instanceName); +ClusterConfig clusterConfig = clusterCache.getClusterConfig(); + +_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); +_faultZone = computeFaultZone(clusterConfig, instanceConfig); +_instanceTags = new HashSet<>(instanceConfig.getTags()); +_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); +_maxCapacity = instanceConfig.getInstanceCapacityMap(); +_maxPartition = clusterConfig.getMaxPartitionsPerInstance(); + +assignNewBatch(existingAssignment); + } + + /** + * Assign a replica to the node. + * + * @param assignableReplica - the replica to be assigned + */ + void assign(AssignableReplica assignableReplica) { +if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) { + throw new HelixException(String + .format("Resource {} already has a replica from partition {} on this node", + assignableReplica.getResourceName(), assignableReplica.getPartitionName())); +} else { + if (assignableReplica.isReplicaTopState()) { +addToAssignmentRecord(assignableReplica, _currentTopStateAssignments); + } + _totalReplicaAssignmentCount += 1; + assignableReplica.getCapacity().entrySet().stream() + .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), entry.getValue())); +} + } + + /** + * Release a replica from the node. + * If the replication is not on this node, the assignable node is not updated. + * + * @param assignableReplica - the replica to be released + */ + void release(AssignableReplica assignableReplica) throws IllegalArgumentException { +String resourceName = assignableReplica.getResourceName(); +String partitionName = assignableReplica.getPartitionName(); +if (!_currentAssignments.containsKey(resourceName)) { + _logger.warn("Resource " + resourceName +
[GitHub] [helix] jiajunwang opened a new pull request #363: Fix the race condition while Helix refresh cluster status cache.
jiajunwang opened a new pull request #363: Fix the race condition while Helix refresh cluster status cache. URL: https://github.com/apache/helix/pull/363 ### Issues #331 ### Description The design is ensuring one read only to avoid locking during the change notification. However, a later update introduced addition read. The result is that two reads may have different results because notification is lock free. This leads the cache to be in an inconsistent state. The impact is that the expected rebalance might not happen. ### Tests Running This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] pkuwm commented on issue #357: [WIP] Add getWorkflows(long timeout) to TaskDriver.
pkuwm commented on issue #357: [WIP] Add getWorkflows(long timeout) to TaskDriver. URL: https://github.com/apache/helix/pull/357#issuecomment-515564146 @i3wangyi I've created a thread pool executor in the class as private. To avoid creating the thread pool each time at once when new a TaskDriver, instead I create a `startPool()` to create the thread pool: if getWorkflows(long timeout) is not going to call, it is not necessary to create the thread pool. For the pool size, we will let caller to choose a good pool size for the needs. And why I don't use `ScheduledExecutorService` is because `ScheduledExecutorService` could also be used to delay to start a task, which we don't need. Thanks for the suggestion. > I'm afraid the underlying the executor is creating a thread pool with one worker. If this is the case, there's certainly overhead if each time when the method gets invoked, the thread gets created and deleted after the method call. I don't think creating a thread every time is cheap, I would suggest making it a private field. Also for timeout mechanism, there's better choice for you https://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] dasahcc merged pull request #360: Remove TODO NPE log for computeResourceBestPossibleState
dasahcc merged pull request #360: Remove TODO NPE log for computeResourceBestPossibleState URL: https://github.com/apache/helix/pull/360 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] jiajunwang merged pull request #348: Adding the configuration items of the WAGED rebalancer.
jiajunwang merged pull request #348: Adding the configuration items of the WAGED rebalancer. URL: https://github.com/apache/helix/pull/348 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] jiajunwang commented on a change in pull request #348: Adding the configuration items of the WAGED rebalancer.
jiajunwang commented on a change in pull request #348: Adding the configuration items of the WAGED rebalancer. URL: https://github.com/apache/helix/pull/348#discussion_r307863842 ## File path: helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java ## @@ -775,17 +794,38 @@ private void validate() { } } } + + if (_partitionCapacityMap != null) { +if (_partitionCapacityMap.keySet().stream() +.noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) { + throw new IllegalArgumentException( + "Partition capacity is configured without the DEFAULT capacity!"); +} +if (_partitionCapacityMap.values().stream() +.anyMatch(capacity -> capacity.values().stream().anyMatch(value -> value < 0))) { + throw new IllegalArgumentException( + "Partition capacity is configured with negative capacity value!"); +} + } } public ResourceConfig build() { // TODO: Reenable the validation in the future when ResourceConfig is ready. // validate(); - return new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, _stateModelDefRef, Review comment: Discussed with Yi offline. I have converted to be the preferred style. However, the builder pattern used in the resource config is still not the recommended way. We will fix it in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] i3wangyi commented on a change in pull request #362: The WAGED rebalancer cluster model implementation
i3wangyi commented on a change in pull request #362: The WAGED rebalancer cluster model implementation URL: https://github.com/apache/helix/pull/362#discussion_r307853283 ## File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java ## @@ -19,10 +19,290 @@ * under the License. */ +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.max; + /** - * A placeholder before we have the implementation. - * - * This class represents a potential allocation of the replication. - * Note that AssignableNode is not thread safe. + * This class represents a possible allocation of the replication. + * Note that any usage updates to the AssignableNode are not thread safe. */ -public class AssignableNode { } +public class AssignableNode { Review comment: Generally, let's order the fields by `private final` and `private`. Because `final` means the field is a necessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] i3wangyi commented on a change in pull request #348: Adding the configuration items of the WAGED rebalancer.
i3wangyi commented on a change in pull request #348: Adding the configuration items of the WAGED rebalancer. URL: https://github.com/apache/helix/pull/348#discussion_r307850803 ## File path: helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java ## @@ -775,17 +794,38 @@ private void validate() { } } } + + if (_partitionCapacityMap != null) { +if (_partitionCapacityMap.keySet().stream() +.noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) { + throw new IllegalArgumentException( + "Partition capacity is configured without the DEFAULT capacity!"); +} +if (_partitionCapacityMap.values().stream() +.anyMatch(capacity -> capacity.values().stream().anyMatch(value -> value < 0))) { + throw new IllegalArgumentException( + "Partition capacity is configured with negative capacity value!"); +} + } } public ResourceConfig build() { // TODO: Reenable the validation in the future when ResourceConfig is ready. // validate(); - return new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, _stateModelDefRef, Review comment: Builder pattern usually only has one line for the method ``` return new ResourceConfig(this); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [helix] jiajunwang commented on issue #348: Adding the configuration items of the WAGED rebalancer.
jiajunwang commented on issue #348: Adding the configuration items of the WAGED rebalancer. URL: https://github.com/apache/helix/pull/348#issuecomment-515333872 I tried to use more java 8 stream as @i3wangyi suggested. it is convenient. But I will not change all of the loops to stream. Some of them are too complicated. I reviewed the code again myself, and found one issue. Fixed in the new commit. Please help to review the logic as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services