[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-28 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307987847
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307943700
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterDataProvider.java
 ##
 @@ -0,0 +1,146 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The data provider generates the Cluster Model based on the controller's 
data cache.
+ */
+public class ClusterDataProvider {
+  /**
+   * @param dataProvider   The controller's data cache.
+   * @param resourceMapThe full list of the resources to be 
rebalanced. Note that any
+   *   resources that are not in this list will be 
removed from the
+   *   final assignment.
+   * @param activeInstancesThe active instances that will be used in 
the calculation.
+   *   Note this list can be different from the 
real active node list
+   *   according to the rebalancer logic.
+   * @param clusterChanges All the cluster changes that happened after 
the previous rebalance.
+   * @param baselineAssignment The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that 
was generated in the
+   *   previous rebalance.
+   * @return The cluster model as the input for the upcoming rebalance.
+   */
+  public static ClusterModel 
generateClusterModel(ResourceControllerDataProvider dataProvider,
+  Map resourceMap, Set activeInstances,
+  Map> clusterChanges,
+  Map baselineAssignment, Map 
bestPossibleAssignment) {
+// Initialize all assignable replicas according to the input
+Set assignableReplicas =
+generateAssignableReplicas(dataProvider, resourceMap, 
activeInstances.size(),
+bestPossibleAssignment, clusterChanges);
+
+// TODO split the to-be-assigned replications to separate list.
+Map> assignedReplicaMap = new HashMap<>();
+
+// Construct cluster context.
+ClusterContext context = new ClusterContext(assignableReplicas, 
activeInstances.size());
+
+// Construct all the assignble nodes and initialize with the confirmed 
assignment.
+Set assignableNodes = activeInstances.stream().map(
+instanceName -> new AssignableNode(dataProvider, instanceName,
+assignedReplicaMap.getOrDefault(instanceName, 
Collections.emptySet(
+.collect(Collectors.toSet());
+
+// Initial the cluster context object with the confirmed assignments.
+Map>> assignmentForFaultZoneMap = new 
HashMap<>();
+assignableNodes.stream()
+.forEach(node -> addNodeAssignmentsToFaultZone(node, 
assignmentForFaultZoneMap));
+context.setAssignmentForFaultZoneMap(assignmentForFaultZoneMap);
+
+return new ClusterModel(context, assignableReplicas, assignableNodes, 
baselineAssignment,
+bestPossibleAssignment);
+  }
+
+  /**
+   * Find all the replications that need to be reallocated.
+   *
+   * @param dataProvider   The cluster status cache that contains the 
current cluster status.
+   * @param resourceMapAll the valid resources that are managed by 
the rebalancer.
+   * @param bestPossibleAssignment The persisted Best Possible State.
+   * @param clusterChanges All the cluster changes that happened after 
the previous rebalance.
+   * @return A set of assignable replications that need reallocation.
+   */
+  private static Set 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307940906
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307940435
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307940965
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307943962
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
 ##
 @@ -19,9 +19,129 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class wraps the required input for the rebalance algorithm.
  */
-public class ClusterModel { }
+public class ClusterModel {
+  private final ClusterContext _clusterContext;
+  // >>
+  private final Map> 
_assignableReplicaIndex;
+  private final Map _assignableNodeMap;
+
+  // Records about the previous assignment
+  private final Map _baselineAssignment;
+  private final Map _bestPossibleAssignment;
+
+  /**
+   * @param clusterContext The initialized cluster context.
+   * @param assignableReplicas The replications to be assigned.
+   *   Note that the replicas in this list shall 
not be included while initializing the context and assignable nodes.
+   * @param assignableNodesThe active instances.
+   * @param baselineAssignment The recorded baseline assignment.
+   * @param bestPossibleAssignment The current best possible assignment.
+   */
+  ClusterModel(ClusterContext clusterContext, Set 
assignableReplicas,
+  Set assignableNodes, Map 
baselineAssignment,
+  Map bestPossibleAssignment) {
+_clusterContext = clusterContext;
+
+// Index all the replicas to be assigned.
+_assignableReplicaIndex = new HashMap<>();
+assignableReplicas.stream().forEach(rep -> {
+  Map replicaMap =
+  _assignableReplicaIndex.getOrDefault(rep.getResourceName(), new 
HashMap<>());
+  replicaMap.put(rep.getReplicaKey(), rep);
 
 Review comment:
   `computeIfAbsent`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307935935
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
 
 Review comment:
   Checked callers and assignments which are all using `int`, it seems there is 
no need to return an `Integer` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307938009
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 ##
 @@ -19,9 +19,104 @@
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed 
cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map _estimatedMaxPartitionByResource = new 
HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map>> _assignmentForFaultZoneMap = 
new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSetAll the partition replicas that are managed by the 
rebalancer
+   * @param instanceCount The count of all the active instances that can be 
used to host partitions.
+   */
+  ClusterContext(Set replicaSet, int instanceCount) {
+int totalReplicas = 0;
+int totalTopStateReplicas = 0;
+
+for (Map.Entry> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) 
{
+  int replicas = entry.getValue().size();
+  totalReplicas += replicas;
+
+  int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, 
instanceCount));
+  _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+  totalTopStateReplicas +=
+  
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+}
+
+_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
+_estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map>> getAssignmentForFaultZoneMap() {
+return _assignmentForFaultZoneMap;
+  }
+
+  public Integer getEstimatedMaxPartitionCount() {
+return _estimatedMaxPartitionCount;
+  }
+
+  public Integer getEstimatedMaxPartitionByResource(String resourceName) {
+return _estimatedMaxPartitionByResource.get(resourceName);
+  }
+
+  public Integer getEstimatedMaxTopStateCount() {
 
 Review comment:
   Same. Return `int`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307939730
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 ##
 @@ -19,9 +19,104 @@
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed 
cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map _estimatedMaxPartitionByResource = new 
HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map>> _assignmentForFaultZoneMap = 
new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSetAll the partition replicas that are managed by the 
rebalancer
+   * @param instanceCount The count of all the active instances that can be 
used to host partitions.
+   */
+  ClusterContext(Set replicaSet, int instanceCount) {
+int totalReplicas = 0;
+int totalTopStateReplicas = 0;
+
+for (Map.Entry> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) 
{
+  int replicas = entry.getValue().size();
+  totalReplicas += replicas;
+
+  int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, 
instanceCount));
+  _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+  totalTopStateReplicas +=
+  
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+}
+
+_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
+_estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map>> getAssignmentForFaultZoneMap() {
+return _assignmentForFaultZoneMap;
+  }
+
+  public Integer getEstimatedMaxPartitionCount() {
+return _estimatedMaxPartitionCount;
+  }
+
+  public Integer getEstimatedMaxPartitionByResource(String resourceName) {
+return _estimatedMaxPartitionByResource.get(resourceName);
+  }
+
+  public Integer getEstimatedMaxTopStateCount() {
+return _estimatedMaxTopStateCount;
+  }
+
+  public Set getPartitionsForResourceAndFaultZone(String faultZoneId, 
String resourceName) {
+if (_assignmentForFaultZoneMap.containsKey(faultZoneId) && 
_assignmentForFaultZoneMap
+.get(faultZoneId).containsKey(resourceName)) {
+  return _assignmentForFaultZoneMap.get(faultZoneId).get(resourceName);
+}
+return Collections.emptySet();
+  }
+
+  void addPartitionToFaultZone(String faultZoneId, String resourceName, String 
partition) {
+if (!_assignmentForFaultZoneMap.containsKey(faultZoneId)) {
+  _assignmentForFaultZoneMap.put(faultZoneId, new HashMap<>());
+}
+if 
(!_assignmentForFaultZoneMap.get(faultZoneId).containsKey(resourceName)) {
+  _assignmentForFaultZoneMap.get(faultZoneId).put(resourceName, new 
HashSet<>());
+}
+
_assignmentForFaultZoneMap.get(faultZoneId).get(resourceName).add(partition);
 
 Review comment:
   For this code block, would it look cleaner if we `computeIfAbsent()`?
   ```java
   _assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
 .computeIfAbsent(resourceName, k -> new 
HashSet<>())
 .add(partition);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307937930
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 ##
 @@ -19,9 +19,104 @@
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed 
cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map _estimatedMaxPartitionByResource = new 
HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map>> _assignmentForFaultZoneMap = 
new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSetAll the partition replicas that are managed by the 
rebalancer
+   * @param instanceCount The count of all the active instances that can be 
used to host partitions.
+   */
+  ClusterContext(Set replicaSet, int instanceCount) {
+int totalReplicas = 0;
+int totalTopStateReplicas = 0;
+
+for (Map.Entry> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) 
{
+  int replicas = entry.getValue().size();
+  totalReplicas += replicas;
+
+  int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, 
instanceCount));
+  _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+  totalTopStateReplicas +=
+  
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+}
+
+_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
+_estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map>> getAssignmentForFaultZoneMap() {
+return _assignmentForFaultZoneMap;
+  }
+
+  public Integer getEstimatedMaxPartitionCount() {
 
 Review comment:
   Return `int`? No need to autobox into Integer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307936299
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
+return _resourceMaxPartitionsPerInstance;
+  }
+
+  public String getReplicaKey() {
+return generateReplicaKey(_resourceName, _partitionName, _replicaState);
+  }
+
+  @Override
+  public int compareTo(AssignableReplica replica) {
+if (!_resourceName.equals(replica._resourceName)) {
+  return _resourceName.compareTo(replica._resourceName);
+}
+if (!_partitionName.equals(replica._partitionName)) {
+  return _partitionName.compareTo(replica._partitionName);
+}
+if (!_replicaState.equals(replica._replicaState)) {
+  return _replicaState.compareTo(replica._replicaState);
+}
+return 0;
+  }
+
+  public static String generateReplicaKey(String resourceName, String 
partitioName, String state) {
+return String.format("%s-%s-%s", resourceName, partitioName, state);
+  }
+
+  /**
+   * Parse the resource config for the partition weight.
+   */
+  private Map fetchCapacityUsage(String partitionName,
+  ResourceConfig resourceConfig) {
+Map> capacityMap;
+
+try {
+  capacityMap = resourceConfig.getPartitionCapacityMap();
+} catch (IOException ex) {
+  throw new IllegalArgumentException("Invalid partition capacity 
configuration.", ex);
+}
+
+Map partitionCapacity = capacityMap.get(partitionName);
+if (partitionCapacity == null) {
+  partitionCapacity.get(ResourceConfig.DEFAULT_PARTITION_KEY);
 
 Review comment:
   Typo? What does this do? If partitionCapacity is null, this would throw a 
NullPointerException.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307936299
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
+return _resourceMaxPartitionsPerInstance;
+  }
+
+  public String getReplicaKey() {
+return generateReplicaKey(_resourceName, _partitionName, _replicaState);
+  }
+
+  @Override
+  public int compareTo(AssignableReplica replica) {
+if (!_resourceName.equals(replica._resourceName)) {
+  return _resourceName.compareTo(replica._resourceName);
+}
+if (!_partitionName.equals(replica._partitionName)) {
+  return _partitionName.compareTo(replica._partitionName);
+}
+if (!_replicaState.equals(replica._replicaState)) {
+  return _replicaState.compareTo(replica._replicaState);
+}
+return 0;
+  }
+
+  public static String generateReplicaKey(String resourceName, String 
partitioName, String state) {
+return String.format("%s-%s-%s", resourceName, partitioName, state);
+  }
+
+  /**
+   * Parse the resource config for the partition weight.
+   */
+  private Map fetchCapacityUsage(String partitionName,
+  ResourceConfig resourceConfig) {
+Map> capacityMap;
+
+try {
+  capacityMap = resourceConfig.getPartitionCapacityMap();
+} catch (IOException ex) {
+  throw new IllegalArgumentException("Invalid partition capacity 
configuration.", ex);
+}
+
+Map partitionCapacity = capacityMap.get(partitionName);
+if (partitionCapacity == null) {
+  partitionCapacity.get(ResourceConfig.DEFAULT_PARTITION_KEY);
 
 Review comment:
   Typo? What does this do? If partitionCapacity is null, this would throw a 
NullPointException.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307935935
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
 
 Review comment:
   Checked callers and assignments, it seems there is no need to return an 
`Integer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307933787
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName +