xyuanlu commented on code in PR #2944:
URL: https://github.com/apache/helix/pull/2944#discussion_r1794419593


##########
helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java:
##########
@@ -80,11 +114,27 @@ public void setCapacity(int capacity) {
   }
 
   /**
-   * Get the ID of this node
+   * Get the instance name of this node
    * @return The ID of this node
    */
-  public String getId() {
-    return _id;
+  public String getInstanceName() {
+    return _instanceName;
+  }
+
+  /**
+   * Get the logical id of this node
+   * @return The ID of this node
+   */
+  public String getLogicalId() {
+    return _logicaId;
+  }
+
+  /**
+   * Get the fault zone of this node
+   * @return The ID of this node

Review Comment:
   return the fault zone?



##########
helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java:
##########
@@ -98,8 +148,38 @@ public int getCurrentlyAssigned() {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
-        .append("\ncapacity:").append(_capacity);
+    sb.append("##########\nname=").append(_instanceName).append("\nassigned:")
+        
.append(_currentlyAssigned).append("\ncapacity:").append(_capacity).append("\nlogicalId:")
+        .append(_logicaId).append("\nfaultZone:").append(_faultZone);
     return sb.toString();
   }
+
+  @Override
+  public int compareTo(CapacityNode o) {
+    if (_logicaId != null) {
+      return _logicaId.compareTo(o.getLogicalId());
+    }
+    return _instanceName.compareTo(o.getInstanceName());
+  }
+
+  /**
+   * Computes the fault zone id based on the domain and fault zone type when 
topology is enabled.
+   * For example, when
+   * the domain is "zone=2, instance=testInstance" and the fault zone type is 
"zone", this function
+   * returns "2".
+   * If cannot find the fault zone type, this function leaves the fault zone 
id as the instance name.

Review Comment:
   Should we use logical ID as default fault zone if we can't find the fault 
zone type?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java:
##########
@@ -74,34 +73,50 @@ public ZNRecord computePartitionAssignment(final 
List<String> allNodes,
     // Note the liveNodes parameter here might be processed within the 
rebalancer, e.g. filter based on tags
     Set<CapacityNode> assignableNodeSet = new 
HashSet<>(clusterData.getSimpleCapacitySet());
     Set<String> liveNodesSet = new HashSet<>(liveNodes);
-    assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));
+    assignableNodeSet.removeIf(n -> 
!liveNodesSet.contains(n.getInstanceName()));
+
+    // Convert the assignableNodes to map for quick lookup
+    Map<String, CapacityNode> assignableNodeMap = assignableNodeSet.stream()
+        .collect(Collectors.toMap(CapacityNode::getInstanceName, node -> 
node));
 
     //  Populate valid state map given current mapping
     Map<String, Set<String>> stateMap =
-        populateValidAssignmentMapFromCurrentMapping(currentMapping, 
assignableNodeSet);
+        populateValidAssignmentMapFromCurrentMapping(currentMapping, 
assignableNodeMap);
 
     if (logger.isDebugEnabled()) {
       logger.debug("currentMapping: {}", currentMapping);
       logger.debug("stateMap: {}", stateMap);
     }
 
     // Sort the assignable nodes by id
-    List<CapacityNode> assignableNodeList =
-        
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
+    List<CapacityNode> assignableNodeList = assignableNodeSet.stream().sorted()
             .collect(Collectors.toList());
 
     // Assign partitions to node by order.
     for (int i = 0, index = 0; i < _partitions.size(); i++) {
       int startIndex = index;
+      Set<String> currentFaultZones = new HashSet<>();
       int remainingReplica = _statesReplicaCount;
       if (stateMap.containsKey(_partitions.get(i))) {
-        remainingReplica = remainingReplica - 
stateMap.get(_partitions.get(i)).size();
+        Set<String> existingReplicas = stateMap.get(_partitions.get(i));
+        remainingReplica = remainingReplica - existingReplicas.size();
+        for (String instanceName : existingReplicas) {
+          
currentFaultZones.add(assignableNodeMap.get(instanceName).getFaultZone());
+        }
       }
       for (int j = 0; j < remainingReplica; j++) {
         while (index - startIndex < assignableNodeList.size()) {
           CapacityNode node = assignableNodeList.get(index++ % 
assignableNodeList.size());
-          if (node.canAdd(_resourceName, _partitions.get(i))) {
-            stateMap.computeIfAbsent(_partitions.get(i), m -> new 
HashSet<>()).add(node.getId());
+          // Valid assignment when following conditions match:

Review Comment:
   May I suggest move the logic to a separate function for extendability? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to