junkaixue commented on code in PR #2866: URL: https://github.com/apache/helix/pull/2866#discussion_r1702145903
########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java: ########## @@ -0,0 +1,205 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> { + private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private String _resourceName; + private List<String> _partitions; + private LinkedHashMap<String, Integer> _states; + + public StickyRebalanceStrategy() { + } + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + } + + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ResourceControllerDataProvider clusterData) { + ZNRecord znRecord = new ZNRecord(_resourceName); + if (liveNodes.isEmpty()) { + return znRecord; + } + + if (clusterData.getSimpleCapacitySet() == null) { + logger.warn("No capacity set for resource: " + _resourceName); + return znRecord; + } + + // Sort the assignable nodes by id + List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); + assignableNodes.sort(Comparator.comparing(CapacityNode::getId)); + + // Filter out the nodes if not in the liveNodes parameter + // Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags + Set<String> liveNodesSet = new HashSet<>(liveNodes); + assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId())); + + // Populate valid state map given current mapping + Map<String, Map<String, String>> stateMap = + populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes); + + if (logger.isDebugEnabled()) { + logger.debug("currentMapping: {}", currentMapping); + logger.debug("stateMap: {}", stateMap); + } + + // Assign partitions to node by order. + for (int i = 0, index = 0; i < _partitions.size(); i++) { + int startIndex = index; + for (Map.Entry<String, Integer> entry : _states.entrySet()) { + String state = entry.getKey(); + int stateReplicaNumber = entry.getValue(); + stateMap.putIfAbsent(_partitions.get(i), new HashMap<>()); + // For this partition, compute existing number replicas + long existsReplicas = + stateMap.get(_partitions.get(i)).values().stream().filter(s -> s.equals(state)).count(); + for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) { + while (index - startIndex < assignableNodes.size()) { + CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); + if (node.canAdd(_resourceName, _partitions.get(i))) { + stateMap.get(_partitions.get(i)).put(node.getId(), state); + break; + } + } + + if (index - startIndex >= assignableNodes.size()) { + // If the all nodes have been tried out, then no node can be assigned. + logger.warn("No enough assignable nodes for resource: " + _resourceName); + } + } + } + } + for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) { + znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue().keySet())); + } + if (logger.isDebugEnabled()) { + logger.debug("znRecord: {}", znRecord); + } + + return znRecord; + } + + private int countStateReplicas() { + int total = 0; + for (Integer count : _states.values()) { + total += count; + } + return total; + } + + /** + * Populates a valid state map from the current mapping, filtering out invalid nodes. + * + * @param currentMapping the current mapping of partitions to node states + * @param assignableNodes the list of nodes that can be assigned + * @return a map of partitions to valid node states + */ + private Map<String, Map<String, String>> populateValidStateMapFromCurrentMapping( + final Map<String, Map<String, String>> currentMapping, + final List<CapacityNode> assignableNodes) { + Map<String, Map<String, String>> validStateMap = new HashMap<>(); + // Convert the assignableNodes to map for quick lookup + Map<String, CapacityNode> assignableNodeMap = + assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node)); + if (currentMapping != null) { + for (Map.Entry<String, Map<String, String>> entry : currentMapping.entrySet()) { + String partition = entry.getKey(); + Map<String, String> currentNodeStateMap = new HashMap<>(entry.getValue()); + // Skip if current node state is invalid with state model + if (!isValidStateMap(currentNodeStateMap)) { + continue; + } + // Filter out invalid node assignment + currentNodeStateMap.entrySet() + .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), assignableNodeMap)); + + validStateMap.put(partition, currentNodeStateMap); + } + } + return validStateMap; + } + + /** + * Validates whether the provided state mapping is valid according to the defined state model. + * + * @param currentNodeStateMap A map representing the actual state mapping where the key is the node ID and the value is the state. + * @return true if the state map is valid, false otherwise + */ + private boolean isValidStateMap(final Map<String, String> currentNodeStateMap) { + // Check if the size of the current state map exceeds the total state count in state model + if (currentNodeStateMap.size() > countStateReplicas()) { + return false; + } + + Map<String, Integer> tmpStates = new HashMap<>(_states); + for (String state : currentNodeStateMap.values()) { + // If the state is not defined in the state model, return invalid + if (!tmpStates.containsKey(state)) { + return false; + } + tmpStates.put(state, tmpStates.get(state) - 1); + } + + // Ensure no state has a negative count + return tmpStates.values().stream().noneMatch(count -> count < 0); Review Comment: This is really expensive... Especially in multiple for loop. We should rely on previous checking code to validate it instead of this last line. ########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java: ########## @@ -0,0 +1,205 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> { + private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private String _resourceName; + private List<String> _partitions; + private LinkedHashMap<String, Integer> _states; + + public StickyRebalanceStrategy() { + } + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + } + + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ResourceControllerDataProvider clusterData) { + ZNRecord znRecord = new ZNRecord(_resourceName); + if (liveNodes.isEmpty()) { + return znRecord; + } + + if (clusterData.getSimpleCapacitySet() == null) { + logger.warn("No capacity set for resource: " + _resourceName); Review Comment: Use inexplicit instead of string concatenate. ########## helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java: ########## @@ -78,13 +95,13 @@ public String getId() { * @return The number of partitions currently assigned to this node */ public int getCurrentlyAssigned() { - return _currentlyAssigned; + return _partitionMap.values().stream().mapToInt(Set::size).sum(); Review Comment: Realtime compute could be slow... I know this could be better looking or clear logic but our target in computation performance should be high (ms level). A pre computed value is better. ########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java: ########## @@ -0,0 +1,205 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> { + private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private String _resourceName; + private List<String> _partitions; + private LinkedHashMap<String, Integer> _states; + + public StickyRebalanceStrategy() { + } + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + } + + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ResourceControllerDataProvider clusterData) { + ZNRecord znRecord = new ZNRecord(_resourceName); + if (liveNodes.isEmpty()) { + return znRecord; + } + + if (clusterData.getSimpleCapacitySet() == null) { + logger.warn("No capacity set for resource: " + _resourceName); + return znRecord; + } + + // Sort the assignable nodes by id + List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); + assignableNodes.sort(Comparator.comparing(CapacityNode::getId)); + + // Filter out the nodes if not in the liveNodes parameter + // Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags + Set<String> liveNodesSet = new HashSet<>(liveNodes); + assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId())); + + // Populate valid state map given current mapping + Map<String, Map<String, String>> stateMap = + populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes); + + if (logger.isDebugEnabled()) { + logger.debug("currentMapping: {}", currentMapping); + logger.debug("stateMap: {}", stateMap); + } + + // Assign partitions to node by order. + for (int i = 0, index = 0; i < _partitions.size(); i++) { + int startIndex = index; + for (Map.Entry<String, Integer> entry : _states.entrySet()) { + String state = entry.getKey(); + int stateReplicaNumber = entry.getValue(); + stateMap.putIfAbsent(_partitions.get(i), new HashMap<>()); + // For this partition, compute existing number replicas + long existsReplicas = + stateMap.get(_partitions.get(i)).values().stream().filter(s -> s.equals(state)).count(); + for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) { + while (index - startIndex < assignableNodes.size()) { + CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); + if (node.canAdd(_resourceName, _partitions.get(i))) { + stateMap.get(_partitions.get(i)).put(node.getId(), state); + break; + } + } + + if (index - startIndex >= assignableNodes.size()) { + // If the all nodes have been tried out, then no node can be assigned. + logger.warn("No enough assignable nodes for resource: " + _resourceName); Review Comment: Same here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org