xyuanlu commented on code in PR #2944:
URL: https://github.com/apache/helix/pull/2944#discussion_r1794419593
##########
helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java:
##########
@@ -80,11 +114,27 @@ public void setCapacity(int capacity) {
}
/**
- * Get the ID of this node
+ * Get the instance name of this node
* @return The ID of this node
*/
- public String getId() {
- return _id;
+ public String getInstanceName() {
+ return _instanceName;
+ }
+
+ /**
+ * Get the logical id of this node
+ * @return The ID of this node
+ */
+ public String getLogicalId() {
+ return _logicaId;
+ }
+
+ /**
+ * Get the fault zone of this node
+ * @return The ID of this node
Review Comment:
return the fault zone?
##########
helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java:
##########
@@ -98,8 +148,38 @@ public int getCurrentlyAssigned() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
-
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
- .append("\ncapacity:").append(_capacity);
+ sb.append("##########\nname=").append(_instanceName).append("\nassigned:")
+
.append(_currentlyAssigned).append("\ncapacity:").append(_capacity).append("\nlogicalId:")
+ .append(_logicaId).append("\nfaultZone:").append(_faultZone);
return sb.toString();
}
+
+ @Override
+ public int compareTo(CapacityNode o) {
+ if (_logicaId != null) {
+ return _logicaId.compareTo(o.getLogicalId());
+ }
+ return _instanceName.compareTo(o.getInstanceName());
+ }
+
+ /**
+ * Computes the fault zone id based on the domain and fault zone type when
topology is enabled.
+ * For example, when
+ * the domain is "zone=2, instance=testInstance" and the fault zone type is
"zone", this function
+ * returns "2".
+ * If cannot find the fault zone type, this function leaves the fault zone
id as the instance name.
Review Comment:
Should we use logical ID as default fault zone if we can't find the fault
zone type?
##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java:
##########
@@ -74,34 +73,50 @@ public ZNRecord computePartitionAssignment(final
List<String> allNodes,
// Note the liveNodes parameter here might be processed within the
rebalancer, e.g. filter based on tags
Set<CapacityNode> assignableNodeSet = new
HashSet<>(clusterData.getSimpleCapacitySet());
Set<String> liveNodesSet = new HashSet<>(liveNodes);
- assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));
+ assignableNodeSet.removeIf(n ->
!liveNodesSet.contains(n.getInstanceName()));
+
+ // Convert the assignableNodes to map for quick lookup
+ Map<String, CapacityNode> assignableNodeMap = assignableNodeSet.stream()
+ .collect(Collectors.toMap(CapacityNode::getInstanceName, node ->
node));
// Populate valid state map given current mapping
Map<String, Set<String>> stateMap =
- populateValidAssignmentMapFromCurrentMapping(currentMapping,
assignableNodeSet);
+ populateValidAssignmentMapFromCurrentMapping(currentMapping,
assignableNodeMap);
if (logger.isDebugEnabled()) {
logger.debug("currentMapping: {}", currentMapping);
logger.debug("stateMap: {}", stateMap);
}
// Sort the assignable nodes by id
- List<CapacityNode> assignableNodeList =
-
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
+ List<CapacityNode> assignableNodeList = assignableNodeSet.stream().sorted()
.collect(Collectors.toList());
// Assign partitions to node by order.
for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
+ Set<String> currentFaultZones = new HashSet<>();
int remainingReplica = _statesReplicaCount;
if (stateMap.containsKey(_partitions.get(i))) {
- remainingReplica = remainingReplica -
stateMap.get(_partitions.get(i)).size();
+ Set<String> existingReplicas = stateMap.get(_partitions.get(i));
+ remainingReplica = remainingReplica - existingReplicas.size();
+ for (String instanceName : existingReplicas) {
+
currentFaultZones.add(assignableNodeMap.get(instanceName).getFaultZone());
+ }
}
for (int j = 0; j < remainingReplica; j++) {
while (index - startIndex < assignableNodeList.size()) {
CapacityNode node = assignableNodeList.get(index++ %
assignableNodeList.size());
- if (node.canAdd(_resourceName, _partitions.get(i))) {
- stateMap.computeIfAbsent(_partitions.get(i), m -> new
HashSet<>()).add(node.getId());
+ // Valid assignment when following conditions match:
Review Comment:
May I suggest move the logic to a separate function for extendability?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]