[GitHub] [helix] pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long timeout) to TaskDriver.

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long 
timeout) to TaskDriver.
URL: https://github.com/apache/helix/pull/357#discussion_r307947633
 
 

 ##
 File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
 ##
 @@ -115,14 +155,58 @@ public TaskDriver(HelixAdmin admin, HelixDataAccessor 
accessor, ConfigAccessor c
 this(admin, accessor, propertyStore, clusterName);
   }
 
+  public TaskDriver(HelixAdmin admin,
+HelixDataAccessor accessor,
+HelixPropertyStore propertyStore,
+String clusterName) {
+this(admin, accessor, propertyStore, clusterName, DEFAULT_POOL_SIZE);
+  }
+
   public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor,
-  HelixPropertyStore propertyStore, String clusterName) {
+  HelixPropertyStore propertyStore, String clusterName, int 
poolSize) {
 _admin = admin;
 _accessor = accessor;
 _propertyStore = propertyStore;
 _clusterName = clusterName;
+_poolSize = poolSize;
+  }
+
+
+  /**
+   *  Start TaskDriver's thread-pool.
+   */
+  public void startPool() {
 
 Review comment:
   Has been thinking about this. 
   It seems it might be better to give callers to pass in its own threadpool. 
The caller would have its ownership to create and shutdown the theadpool. And 
also set the threadpool size based on its needs. Also the TaskDriver 
constructor doesn't have to change parameters.
   
   Would offer below methods:
   ```java
   public ExecutorService getExecutorService() {}
   public void setExecutorService(ExecutorService pool) {}
   ``` 
   
   @i3wangyi @jiajunwang @narendly  How do you like this idea?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long timeout) to TaskDriver.

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #357: [WIP] Add getWorkflows(long 
timeout) to TaskDriver.
URL: https://github.com/apache/helix/pull/357#discussion_r307947633
 
 

 ##
 File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
 ##
 @@ -115,14 +155,58 @@ public TaskDriver(HelixAdmin admin, HelixDataAccessor 
accessor, ConfigAccessor c
 this(admin, accessor, propertyStore, clusterName);
   }
 
+  public TaskDriver(HelixAdmin admin,
+HelixDataAccessor accessor,
+HelixPropertyStore propertyStore,
+String clusterName) {
+this(admin, accessor, propertyStore, clusterName, DEFAULT_POOL_SIZE);
+  }
+
   public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor,
-  HelixPropertyStore propertyStore, String clusterName) {
+  HelixPropertyStore propertyStore, String clusterName, int 
poolSize) {
 _admin = admin;
 _accessor = accessor;
 _propertyStore = propertyStore;
 _clusterName = clusterName;
+_poolSize = poolSize;
+  }
+
+
+  /**
+   *  Start TaskDriver's thread-pool.
+   */
+  public void startPool() {
 
 Review comment:
   Has been thinking about this. 
   It seems it might be better to give callers to pass in its own threadpool. 
The caller would have its ownership to create and shutdown the theadpool. And 
also set the threadpool size based on its needs.
   Would offer below methods:
   ```java
   public ExecutorService getExecutorService() {}
   public void setExecutorService(ExecutorService pool) {}
   ``` 
   
   @i3wangyi @jiajunwang @narendly  How do you like this idea?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] kaisun2000 opened a new issue #364: Fix bugs in RoutingTableProvider metrics and enhance logs to record time spent

2019-07-26 Thread GitBox
kaisun2000 opened a new issue #364: Fix bugs in RoutingTableProvider metrics 
and enhance logs to record time spent
URL: https://github.com/apache/helix/issues/364
 
 
   **Issues:**
   CurrentStateCache updating snapshot would miss all the existing partitions 
that having state change. 
   
   RoutingTableProvider callback on the main event thread. Time is not 
accounted in log.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307943700
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterDataProvider.java
 ##
 @@ -0,0 +1,146 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The data provider generates the Cluster Model based on the controller's 
data cache.
+ */
+public class ClusterDataProvider {
+  /**
+   * @param dataProvider   The controller's data cache.
+   * @param resourceMapThe full list of the resources to be 
rebalanced. Note that any
+   *   resources that are not in this list will be 
removed from the
+   *   final assignment.
+   * @param activeInstancesThe active instances that will be used in 
the calculation.
+   *   Note this list can be different from the 
real active node list
+   *   according to the rebalancer logic.
+   * @param clusterChanges All the cluster changes that happened after 
the previous rebalance.
+   * @param baselineAssignment The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that 
was generated in the
+   *   previous rebalance.
+   * @return The cluster model as the input for the upcoming rebalance.
+   */
+  public static ClusterModel 
generateClusterModel(ResourceControllerDataProvider dataProvider,
+  Map resourceMap, Set activeInstances,
+  Map> clusterChanges,
+  Map baselineAssignment, Map 
bestPossibleAssignment) {
+// Initialize all assignable replicas according to the input
+Set assignableReplicas =
+generateAssignableReplicas(dataProvider, resourceMap, 
activeInstances.size(),
+bestPossibleAssignment, clusterChanges);
+
+// TODO split the to-be-assigned replications to separate list.
+Map> assignedReplicaMap = new HashMap<>();
+
+// Construct cluster context.
+ClusterContext context = new ClusterContext(assignableReplicas, 
activeInstances.size());
+
+// Construct all the assignble nodes and initialize with the confirmed 
assignment.
+Set assignableNodes = activeInstances.stream().map(
+instanceName -> new AssignableNode(dataProvider, instanceName,
+assignedReplicaMap.getOrDefault(instanceName, 
Collections.emptySet(
+.collect(Collectors.toSet());
+
+// Initial the cluster context object with the confirmed assignments.
+Map>> assignmentForFaultZoneMap = new 
HashMap<>();
+assignableNodes.stream()
+.forEach(node -> addNodeAssignmentsToFaultZone(node, 
assignmentForFaultZoneMap));
+context.setAssignmentForFaultZoneMap(assignmentForFaultZoneMap);
+
+return new ClusterModel(context, assignableReplicas, assignableNodes, 
baselineAssignment,
+bestPossibleAssignment);
+  }
+
+  /**
+   * Find all the replications that need to be reallocated.
+   *
+   * @param dataProvider   The cluster status cache that contains the 
current cluster status.
+   * @param resourceMapAll the valid resources that are managed by 
the rebalancer.
+   * @param bestPossibleAssignment The persisted Best Possible State.
+   * @param clusterChanges All the cluster changes that happened after 
the previous rebalance.
+   * @return A set of assignable replications that need reallocation.
+   */
+  private static Set 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307940906
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307940435
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307940965
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307943962
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
 ##
 @@ -19,9 +19,129 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class wraps the required input for the rebalance algorithm.
  */
-public class ClusterModel { }
+public class ClusterModel {
+  private final ClusterContext _clusterContext;
+  // >>
+  private final Map> 
_assignableReplicaIndex;
+  private final Map _assignableNodeMap;
+
+  // Records about the previous assignment
+  private final Map _baselineAssignment;
+  private final Map _bestPossibleAssignment;
+
+  /**
+   * @param clusterContext The initialized cluster context.
+   * @param assignableReplicas The replications to be assigned.
+   *   Note that the replicas in this list shall 
not be included while initializing the context and assignable nodes.
+   * @param assignableNodesThe active instances.
+   * @param baselineAssignment The recorded baseline assignment.
+   * @param bestPossibleAssignment The current best possible assignment.
+   */
+  ClusterModel(ClusterContext clusterContext, Set 
assignableReplicas,
+  Set assignableNodes, Map 
baselineAssignment,
+  Map bestPossibleAssignment) {
+_clusterContext = clusterContext;
+
+// Index all the replicas to be assigned.
+_assignableReplicaIndex = new HashMap<>();
+assignableReplicas.stream().forEach(rep -> {
+  Map replicaMap =
+  _assignableReplicaIndex.getOrDefault(rep.getResourceName(), new 
HashMap<>());
+  replicaMap.put(rep.getReplicaKey(), rep);
 
 Review comment:
   `computeIfAbsent`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307935935
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
 
 Review comment:
   Checked callers and assignments which are all using `int`, it seems there is 
no need to return an `Integer` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307938009
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 ##
 @@ -19,9 +19,104 @@
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed 
cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map _estimatedMaxPartitionByResource = new 
HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map>> _assignmentForFaultZoneMap = 
new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSetAll the partition replicas that are managed by the 
rebalancer
+   * @param instanceCount The count of all the active instances that can be 
used to host partitions.
+   */
+  ClusterContext(Set replicaSet, int instanceCount) {
+int totalReplicas = 0;
+int totalTopStateReplicas = 0;
+
+for (Map.Entry> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) 
{
+  int replicas = entry.getValue().size();
+  totalReplicas += replicas;
+
+  int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, 
instanceCount));
+  _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+  totalTopStateReplicas +=
+  
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+}
+
+_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
+_estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map>> getAssignmentForFaultZoneMap() {
+return _assignmentForFaultZoneMap;
+  }
+
+  public Integer getEstimatedMaxPartitionCount() {
+return _estimatedMaxPartitionCount;
+  }
+
+  public Integer getEstimatedMaxPartitionByResource(String resourceName) {
+return _estimatedMaxPartitionByResource.get(resourceName);
+  }
+
+  public Integer getEstimatedMaxTopStateCount() {
 
 Review comment:
   Same. Return `int`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307939730
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 ##
 @@ -19,9 +19,104 @@
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed 
cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map _estimatedMaxPartitionByResource = new 
HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map>> _assignmentForFaultZoneMap = 
new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSetAll the partition replicas that are managed by the 
rebalancer
+   * @param instanceCount The count of all the active instances that can be 
used to host partitions.
+   */
+  ClusterContext(Set replicaSet, int instanceCount) {
+int totalReplicas = 0;
+int totalTopStateReplicas = 0;
+
+for (Map.Entry> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) 
{
+  int replicas = entry.getValue().size();
+  totalReplicas += replicas;
+
+  int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, 
instanceCount));
+  _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+  totalTopStateReplicas +=
+  
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+}
+
+_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
+_estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map>> getAssignmentForFaultZoneMap() {
+return _assignmentForFaultZoneMap;
+  }
+
+  public Integer getEstimatedMaxPartitionCount() {
+return _estimatedMaxPartitionCount;
+  }
+
+  public Integer getEstimatedMaxPartitionByResource(String resourceName) {
+return _estimatedMaxPartitionByResource.get(resourceName);
+  }
+
+  public Integer getEstimatedMaxTopStateCount() {
+return _estimatedMaxTopStateCount;
+  }
+
+  public Set getPartitionsForResourceAndFaultZone(String faultZoneId, 
String resourceName) {
+if (_assignmentForFaultZoneMap.containsKey(faultZoneId) && 
_assignmentForFaultZoneMap
+.get(faultZoneId).containsKey(resourceName)) {
+  return _assignmentForFaultZoneMap.get(faultZoneId).get(resourceName);
+}
+return Collections.emptySet();
+  }
+
+  void addPartitionToFaultZone(String faultZoneId, String resourceName, String 
partition) {
+if (!_assignmentForFaultZoneMap.containsKey(faultZoneId)) {
+  _assignmentForFaultZoneMap.put(faultZoneId, new HashMap<>());
+}
+if 
(!_assignmentForFaultZoneMap.get(faultZoneId).containsKey(resourceName)) {
+  _assignmentForFaultZoneMap.get(faultZoneId).put(resourceName, new 
HashSet<>());
+}
+
_assignmentForFaultZoneMap.get(faultZoneId).get(resourceName).add(partition);
 
 Review comment:
   For this code block, would it look cleaner if we `computeIfAbsent()`?
   ```java
   _assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
 .computeIfAbsent(resourceName, k -> new 
HashSet<>())
 .add(partition);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307937930
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
 ##
 @@ -19,9 +19,104 @@
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed 
cluster.
+ * This class tracks the rebalance-related global cluster status.
  */
-public class ClusterContext { }
+public class ClusterContext {
+  private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+  // This estimation helps to ensure global partition count evenness
+  private final int _estimatedMaxPartitionCount;
+  // This estimation helps to ensure global top state replica count evenness
+  private final int _estimatedMaxTopStateCount;
+  // This estimation helps to ensure per-resource partition count evenness
+  private final Map _estimatedMaxPartitionByResource = new 
HashMap<>();
+
+  // map{zoneName : map{resourceName : set(partitionNames)}}
+  private Map>> _assignmentForFaultZoneMap = 
new HashMap<>();
+
+  /**
+   * Construct the cluster context based on the current instance status.
+   *
+   * @param replicaSetAll the partition replicas that are managed by the 
rebalancer
+   * @param instanceCount The count of all the active instances that can be 
used to host partitions.
+   */
+  ClusterContext(Set replicaSet, int instanceCount) {
+int totalReplicas = 0;
+int totalTopStateReplicas = 0;
+
+for (Map.Entry> entry : replicaSet.stream()
+
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) 
{
+  int replicas = entry.getValue().size();
+  totalReplicas += replicas;
+
+  int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, 
instanceCount));
+  _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+  totalTopStateReplicas +=
+  
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+}
+
+_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, 
instanceCount);
+_estimatedMaxTopStateCount = 
estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+  }
+
+  public Map>> getAssignmentForFaultZoneMap() {
+return _assignmentForFaultZoneMap;
+  }
+
+  public Integer getEstimatedMaxPartitionCount() {
 
 Review comment:
   Return `int`? No need to autobox into Integer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307936299
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
+return _resourceMaxPartitionsPerInstance;
+  }
+
+  public String getReplicaKey() {
+return generateReplicaKey(_resourceName, _partitionName, _replicaState);
+  }
+
+  @Override
+  public int compareTo(AssignableReplica replica) {
+if (!_resourceName.equals(replica._resourceName)) {
+  return _resourceName.compareTo(replica._resourceName);
+}
+if (!_partitionName.equals(replica._partitionName)) {
+  return _partitionName.compareTo(replica._partitionName);
+}
+if (!_replicaState.equals(replica._replicaState)) {
+  return _replicaState.compareTo(replica._replicaState);
+}
+return 0;
+  }
+
+  public static String generateReplicaKey(String resourceName, String 
partitioName, String state) {
+return String.format("%s-%s-%s", resourceName, partitioName, state);
+  }
+
+  /**
+   * Parse the resource config for the partition weight.
+   */
+  private Map fetchCapacityUsage(String partitionName,
+  ResourceConfig resourceConfig) {
+Map> capacityMap;
+
+try {
+  capacityMap = resourceConfig.getPartitionCapacityMap();
+} catch (IOException ex) {
+  throw new IllegalArgumentException("Invalid partition capacity 
configuration.", ex);
+}
+
+Map partitionCapacity = capacityMap.get(partitionName);
+if (partitionCapacity == null) {
+  partitionCapacity.get(ResourceConfig.DEFAULT_PARTITION_KEY);
 
 Review comment:
   Typo? What does this do? If partitionCapacity is null, this would throw a 
NullPointerException.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307936299
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
+return _resourceMaxPartitionsPerInstance;
+  }
+
+  public String getReplicaKey() {
+return generateReplicaKey(_resourceName, _partitionName, _replicaState);
+  }
+
+  @Override
+  public int compareTo(AssignableReplica replica) {
+if (!_resourceName.equals(replica._resourceName)) {
+  return _resourceName.compareTo(replica._resourceName);
+}
+if (!_partitionName.equals(replica._partitionName)) {
+  return _partitionName.compareTo(replica._partitionName);
+}
+if (!_replicaState.equals(replica._replicaState)) {
+  return _replicaState.compareTo(replica._replicaState);
+}
+return 0;
+  }
+
+  public static String generateReplicaKey(String resourceName, String 
partitioName, String state) {
+return String.format("%s-%s-%s", resourceName, partitioName, state);
+  }
+
+  /**
+   * Parse the resource config for the partition weight.
+   */
+  private Map fetchCapacityUsage(String partitionName,
+  ResourceConfig resourceConfig) {
+Map> capacityMap;
+
+try {
+  capacityMap = resourceConfig.getPartitionCapacityMap();
+} catch (IOException ex) {
+  throw new IllegalArgumentException("Invalid partition capacity 
configuration.", ex);
+}
+
+Map partitionCapacity = capacityMap.get(partitionName);
+if (partitionCapacity == null) {
+  partitionCapacity.get(ResourceConfig.DEFAULT_PARTITION_KEY);
 
 Review comment:
   Typo? What does this do? If partitionCapacity is null, this would throw a 
NullPointException.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307935935
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 ##
 @@ -19,9 +19,124 @@
  * under the License.
  */
 
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
 /**
- * A placeholder before we have the implementation.
- *
  * This class represents a partition replication that needs to be allocated.
  */
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable {
+  private final String _partitionName;
+  private final String _resourceName;
+  private final String _resourceInstanceGroupTag;
+  private final int _resourceMaxPartitionsPerInstance;
+  private final Map _capacityUsage;
+  // The priority of the replica's state
+  private final int _statePriority;
+  // The state of the replica
+  private final String _replicaState;
+
+  /**
+   * @param resourceConfig The resource config for the resource which contains 
the replication.
+   * @param partitionName  The replication's partition name.
+   * @param replicaState   The state of the replication.
+   * @param statePriority  The priority of the replication's state.
+   */
+  AssignableReplica(ResourceConfig resourceConfig, String partitionName, 
String replicaState,
+  int statePriority) {
+_partitionName = partitionName;
+_replicaState = replicaState;
+_statePriority = statePriority;
+_resourceName = resourceConfig.getResourceName();
+_capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+_resourceMaxPartitionsPerInstance = 
resourceConfig.getMaxPartitionsPerInstance();
+  }
+
+  public Map getCapacity() {
+return _capacityUsage;
+  }
+
+  public String getPartitionName() {
+return _partitionName;
+  }
+
+  public String getReplicaState() {
+return _replicaState;
+  }
+
+  public boolean isReplicaTopState() {
+return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+  }
+
+  public int getStatePriority() {
+return _statePriority;
+  }
+
+  public String getResourceName() {
+return _resourceName;
+  }
+
+  public String getResourceInstanceGroupTag() {
+return _resourceInstanceGroupTag;
+  }
+
+  public Integer getResourceMaxPartitionsPerInstance() {
 
 Review comment:
   Checked callers and assignments, it seems there is no need to return an 
`Integer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
pkuwm commented on a change in pull request #362: The WAGED rebalancer cluster 
model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307933787
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
+  private static final Logger _logger = 
LoggerFactory.getLogger(AssignableNode.class.getName());
+
+  // basic node information
+  private final String _instanceName;
+  private Set _instanceTags;
+  private String _faultZone;
+  private Map> _disabledPartitionsMap;
+  private Map _maxCapacity;
+  private int _maxPartition;
+
+  // proposed assignment tracking
+  // 
+  private Map> _currentAssignments;
+  // 
+  private Map> _currentTopStateAssignments;
+  // 
+  private Map _currentCapacity;
+  // runtime usage tracking
+  private int _totalReplicaAssignmentCount;
+  private float _highestCapacityUtilization;
+
+  AssignableNode(ResourceControllerDataProvider clusterCache, String 
instanceName,
+  Collection existingAssignment) {
+_instanceName = instanceName;
+refresh(clusterCache, existingAssignment);
+  }
+
+  private void reset() {
+_currentAssignments = new HashMap<>();
+_currentTopStateAssignments = new HashMap<>();
+_currentCapacity = new HashMap<>();
+_totalReplicaAssignmentCount = 0;
+_highestCapacityUtilization = 0;
+  }
+
+  /**
+   * Update the node with a ClusterDataCache. This resets the current 
assignment and recalculate currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also 
be used when the clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
+   * subject to changes. If the assumption is no longer true, this function 
should become private.
+   *
+   * @param clusterCache - the current cluster cache to initial the 
AssignableNode.
+   */
+  private void refresh(ResourceControllerDataProvider clusterCache,
+  Collection existingAssignment) {
+reset();
+
+InstanceConfig instanceConfig = 
clusterCache.getInstanceConfigMap().get(_instanceName);
+ClusterConfig clusterConfig = clusterCache.getClusterConfig();
+
+_currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+_faultZone = computeFaultZone(clusterConfig, instanceConfig);
+_instanceTags = new HashSet<>(instanceConfig.getTags());
+_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+_maxCapacity = instanceConfig.getInstanceCapacityMap();
+_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+assignNewBatch(existingAssignment);
+  }
+
+  /**
+   * Assign a replica to the node.
+   *
+   * @param assignableReplica - the replica to be assigned
+   */
+  void assign(AssignableReplica assignableReplica) {
+if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+  throw new HelixException(String
+  .format("Resource {} already has a replica from partition {} on this 
node",
+  assignableReplica.getResourceName(), 
assignableReplica.getPartitionName()));
+} else {
+  if (assignableReplica.isReplicaTopState()) {
+addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+  }
+  _totalReplicaAssignmentCount += 1;
+  assignableReplica.getCapacity().entrySet().stream()
+  .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), 
entry.getValue()));
+}
+  }
+
+  /**
+   * Release a replica from the node.
+   * If the replication is not on this node, the assignable node is not 
updated.
+   *
+   * @param assignableReplica - the replica to be released
+   */
+  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
+String resourceName = assignableReplica.getResourceName();
+String partitionName = assignableReplica.getPartitionName();
+if (!_currentAssignments.containsKey(resourceName)) {
+  _logger.warn("Resource " + resourceName + 

[GitHub] [helix] jiajunwang opened a new pull request #363: Fix the race condition while Helix refresh cluster status cache.

2019-07-26 Thread GitBox
jiajunwang opened a new pull request #363: Fix the race condition while Helix 
refresh cluster status cache.
URL: https://github.com/apache/helix/pull/363
 
 
   ### Issues
   #331
   ### Description
   The design is ensuring one read only to avoid locking during the change 
notification. However, a later update introduced addition read. The result is 
that two reads may have different results because notification is lock free. 
This leads the cache to be in an inconsistent state. The impact is that the 
expected rebalance might not happen.
   ### Tests
   Running
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] pkuwm commented on issue #357: [WIP] Add getWorkflows(long timeout) to TaskDriver.

2019-07-26 Thread GitBox
pkuwm commented on issue #357: [WIP] Add getWorkflows(long timeout) to 
TaskDriver.
URL: https://github.com/apache/helix/pull/357#issuecomment-515564146
 
 
   @i3wangyi I've created a thread pool executor in the class as private. To 
avoid creating the thread pool each time at once when new a TaskDriver, instead 
I create a `startPool()` to create the thread pool: if getWorkflows(long 
timeout) is not going to call, it is not necessary to create the thread pool.
   
   For the pool size, we will let caller to choose a good pool size for the 
needs.
   
   And why I don't use `ScheduledExecutorService` is because 
`ScheduledExecutorService` could also be used to delay to start a task, which 
we don't need. Thanks for the suggestion.
   
   > I'm afraid the underlying the executor is creating a thread pool with one 
worker. If this is the case, there's certainly overhead if each time when the 
method gets invoked, the thread gets created and deleted after the method call. 
I don't think creating a thread every time is cheap, I would suggest making it 
a private field. Also for timeout mechanism, there's better choice for you 
https://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] dasahcc merged pull request #360: Remove TODO NPE log for computeResourceBestPossibleState

2019-07-26 Thread GitBox
dasahcc merged pull request #360: Remove TODO NPE log for 
computeResourceBestPossibleState
URL: https://github.com/apache/helix/pull/360
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] jiajunwang merged pull request #348: Adding the configuration items of the WAGED rebalancer.

2019-07-26 Thread GitBox
jiajunwang merged pull request #348: Adding the configuration items of the 
WAGED rebalancer.
URL: https://github.com/apache/helix/pull/348
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] jiajunwang commented on a change in pull request #348: Adding the configuration items of the WAGED rebalancer.

2019-07-26 Thread GitBox
jiajunwang commented on a change in pull request #348: Adding the configuration 
items of the WAGED rebalancer.
URL: https://github.com/apache/helix/pull/348#discussion_r307863842
 
 

 ##
 File path: helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
 ##
 @@ -775,17 +794,38 @@ private void validate() {
   }
 }
   }
+
+  if (_partitionCapacityMap != null) {
+if (_partitionCapacityMap.keySet().stream()
+.noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) {
+  throw new IllegalArgumentException(
+  "Partition capacity is configured without the DEFAULT 
capacity!");
+}
+if (_partitionCapacityMap.values().stream()
+.anyMatch(capacity -> capacity.values().stream().anyMatch(value -> 
value < 0))) {
+  throw new IllegalArgumentException(
+  "Partition capacity is configured with negative capacity 
value!");
+}
+  }
 }
 
 public ResourceConfig build() {
   // TODO: Reenable the validation in the future when ResourceConfig is 
ready.
   // validate();
 
-  return new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, 
_stateModelDefRef,
 
 Review comment:
   Discussed with Yi offline. I have converted to be the preferred style.
   However, the builder pattern used in the resource config is still not the 
recommended way. We will fix it in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] i3wangyi commented on a change in pull request #362: The WAGED rebalancer cluster model implementation

2019-07-26 Thread GitBox
i3wangyi commented on a change in pull request #362: The WAGED rebalancer 
cluster model implementation
URL: https://github.com/apache/helix/pull/362#discussion_r307853283
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 ##
 @@ -19,10 +19,290 @@
  * under the License.
  */
 
+import org.apache.helix.HelixException;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
 /**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
  */
-public class AssignableNode { }
+public class AssignableNode {
 
 Review comment:
   Generally, let's order the fields by 
   `private final`
   and `private`.
   
   Because `final` means the field is a necessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] i3wangyi commented on a change in pull request #348: Adding the configuration items of the WAGED rebalancer.

2019-07-26 Thread GitBox
i3wangyi commented on a change in pull request #348: Adding the configuration 
items of the WAGED rebalancer.
URL: https://github.com/apache/helix/pull/348#discussion_r307850803
 
 

 ##
 File path: helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
 ##
 @@ -775,17 +794,38 @@ private void validate() {
   }
 }
   }
+
+  if (_partitionCapacityMap != null) {
+if (_partitionCapacityMap.keySet().stream()
+.noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) {
+  throw new IllegalArgumentException(
+  "Partition capacity is configured without the DEFAULT 
capacity!");
+}
+if (_partitionCapacityMap.values().stream()
+.anyMatch(capacity -> capacity.values().stream().anyMatch(value -> 
value < 0))) {
+  throw new IllegalArgumentException(
+  "Partition capacity is configured with negative capacity 
value!");
+}
+  }
 }
 
 public ResourceConfig build() {
   // TODO: Reenable the validation in the future when ResourceConfig is 
ready.
   // validate();
 
-  return new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, 
_stateModelDefRef,
 
 Review comment:
   Builder pattern usually only has one line for the method
   ```
   return new ResourceConfig(this);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [helix] jiajunwang commented on issue #348: Adding the configuration items of the WAGED rebalancer.

2019-07-26 Thread GitBox
jiajunwang commented on issue #348: Adding the configuration items of the WAGED 
rebalancer.
URL: https://github.com/apache/helix/pull/348#issuecomment-515333872
 
 
   I tried to use more java 8 stream as @i3wangyi suggested. it is convenient.
   But I will not change all of the loops to stream. Some of them are too 
complicated.
   
   I reviewed the code again myself, and found one issue. Fixed in the new 
commit.
   Please help to review the logic as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services