Add Multi-round CRUSH rebalance strategy.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0f7c3e42 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0f7c3e42 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0f7c3e42 Branch: refs/heads/helix-0.6.x Commit: 0f7c3e42080ba8e2b17e36ca1c5c51c6209b0f03 Parents: 7d0885c Author: Lei Xia <l...@linkedin.com> Authored: Mon Jun 27 15:46:13 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Sun Feb 5 18:50:52 2017 -0800 ---------------------------------------------------------------------- .../MultiRoundCrushRebalanceStrategy.java | 327 +++++++++++++++++++ .../rebalancer/topology/Topology.java | 67 +++- .../integration/TestCrushAutoRebalance.java | 5 +- 3 files changed, 397 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/0f7c3e42/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java new file mode 100644 index 0000000..93bd980 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java @@ -0,0 +1,327 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.InstanceConfig; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.util.JenkinsHash; + + +/** + * Multi-round CRUSH partition mapping strategy. + * This gives more even partition distribution in case of small number of partitions, + * but number of partitions to be reshuffled during node outage could be higher than CrushRebalanceStrategy. + */ +public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy { + private String _resourceName; + private List<String> _partitions; + private Topology _clusterTopo; + private int _replicas; + private LinkedHashMap<String, Integer> _stateCountMap; + + private final int MAX_ITERNATION = 5; + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _replicas = countStateReplicas(states); + _stateCountMap = states; + } + + /** + * Compute the preference lists and (optional partition-state mapping) for the given resource. + * + * @param allNodes All instances + * @param liveNodes List of live instances + * @param currentMapping current replica mapping + * @param clusterData cluster data + * @return + * @throws HelixException if a map can not be found + */ + @Override public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ClusterDataCache clusterData) throws HelixException { + Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap(); + _clusterTopo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + Node root = _clusterTopo.getRootNode(); + + Map<String, List<Node>> zoneMapping = new HashMap<String, List<Node>>(); + for (int i = 0; i < _partitions.size(); i++) { + String partitionName = _partitions.get(i); + long pData = partitionName.hashCode(); + + // select zones for this partition + List<Node> zones = select(root, _clusterTopo.getFaultZoneType(), pData, _replicas); + zoneMapping.put(partitionName, zones); + } + + /* map the position in preference list to the state */ + Map<Integer, String> idxStateMap = new HashMap<Integer, String>(); + int i = 0; + for (Map.Entry<String, Integer> e : _stateCountMap.entrySet()) { + String state = e.getKey(); + int count = e.getValue(); + for (int j = 0; j < count; j++) { + idxStateMap.put(i + j, state); + } + i += count; + } + + // Final mapping <partition, state> -> list(node) + Map<String, Map<String, List<Node>>> partitionStateMapping = + new HashMap<String, Map<String, List<Node>>>(); + + for (Node zone : _clusterTopo.getFaultZones()) { + // partition state -> list(partitions) + LinkedHashMap<String, List<String>> statePartitionMap = + new LinkedHashMap<String, List<String>>(); + // TODO: move this outside? + for (Map.Entry<String, List<Node>> e : zoneMapping.entrySet()) { + String partition = e.getKey(); + List<Node> zones = e.getValue(); + for (int k = 0; k < zones.size(); k++) { + if (zones.get(k).equals(zone)) { + String state = idxStateMap.get(k); + if (!statePartitionMap.containsKey(state)) { + statePartitionMap.put(state, new ArrayList<String>()); + } + statePartitionMap.get(state).add(partition); + } + } + } + + for (String state : _stateCountMap.keySet()) { + List<String> partitions = statePartitionMap.get(state); + if (partitions != null && !partitions.isEmpty()) { + Map<String, Node> assignments = singleZoneMapping(zone, partitions); + for (String partition : assignments.keySet()) { + Node node = assignments.get(partition); + if (!partitionStateMapping.containsKey(partition)) { + partitionStateMapping.put(partition, new HashMap<String, List<Node>>()); + } + Map<String, List<Node>> stateMapping = partitionStateMapping.get(partition); + if (!stateMapping.containsKey(state)) { + stateMapping.put(state, new ArrayList<Node>()); + } + stateMapping.get(state).add(node); + } + } + } + } + + return generateZNRecord(_resourceName, _partitions, partitionStateMapping); + } + + private ZNRecord generateZNRecord(String resource, List<String> partitions, + Map<String, Map<String, List<Node>>> partitionStateMapping) { + Map<String, List<String>> newPreferences = new HashMap<String, List<String>>(); + for (int i = 0; i < partitions.size(); i++) { + String partitionName = partitions.get(i); + Map<String, List<Node>> stateNodeMap = partitionStateMapping.get(partitionName); + + for (String state : _stateCountMap.keySet()) { + List<Node> nodes = stateNodeMap.get(state); + List<String> nodeList = new ArrayList<String>(); + for (int j = 0; j < nodes.size(); j++) { + nodeList.add(nodes.get(j).getName()); + } + if (!newPreferences.containsKey(partitionName)) { + newPreferences.put(partitionName, new ArrayList<String>()); + } + newPreferences.get(partitionName).addAll(nodeList); + } + } + ZNRecord result = new ZNRecord(resource); + result.setListFields(newPreferences); + + return result; + } + + /** + * Compute mapping of partition to node in a single zone. + * Assumption: A partition should have only one replica at one zone. + * Will apply CRUSH multiple times until all partitions are mostly even distributed. + * + * @param zone the zone + * @param partitions partitions to be assigned to nodes in the given zone. + * @return partition to node mapping in this zone. + */ + private Map<String, Node> singleZoneMapping(Node zone, List<String> partitions) { + if (zone.isFailed() || zone.getWeight() == 0 || partitions.isEmpty()) { + return Collections.emptyMap(); + } + + long totalWeight = zone.getWeight(); + int totalPartition = partitions.size(); + + // node to all its assigned partitions. + Map<Node, List<String>> nodePartitionsMap = new HashMap<Node, List<String>>(); + + List<String> partitionsToAssign = new ArrayList<String>(partitions); + Map<Node, List<String>> toRemovedMap = new HashMap<Node, List<String>>(); + + int iteration = 0; + Node root = zone; + while (iteration++ < MAX_ITERNATION) { + for (Map.Entry<Node, List<String>> e : toRemovedMap.entrySet()) { + List<String> curAssignedPartitions = nodePartitionsMap.get(e.getKey()); + List<String> toRemoved = e.getValue(); + curAssignedPartitions.removeAll(toRemoved); + partitionsToAssign.addAll(toRemoved); + } + + for (String p : partitionsToAssign) { + long pData = p.hashCode(); + List<Node> nodes = select(root, _clusterTopo.getEndNodeType(), pData, 1); + for (Node n : nodes) { + if (!nodePartitionsMap.containsKey(n)) { + nodePartitionsMap.put(n, new ArrayList<String>()); + } + nodePartitionsMap.get(n).add(p); + } + } + + Map<String, Integer> newNodeWeight = new HashMap<String, Integer>(); + Set<String> completedNodes = new HashSet<String>(); + for (Node node : Topology.getAllLeafNodes(zone)) { + if (node.isFailed()) { + completedNodes.add(node.getName()); + continue; + } + long weight = node.getWeight(); + double ratio = ((double) weight) / (double) totalWeight; + int target = (int) Math.floor(ratio * totalPartition); + + List<String> assignedPatitions = nodePartitionsMap.get(node); + int numPartitions = 0; + if (assignedPatitions != null) { + numPartitions = assignedPatitions.size(); + } + if (numPartitions > target + 1) { + int remove = numPartitions - target - 1; + Collections.sort(partitions); + List<String> toRemoved = new ArrayList<String>(assignedPatitions.subList(0, remove)); + toRemovedMap.put(node, toRemoved); + } + + int missing = target - numPartitions; + if (missing > 0) { + newNodeWeight.put(node.getName(), missing * 10); + } else { + completedNodes.add(node.getName()); + } + } + + if (newNodeWeight.isEmpty()) { + // already converged + break; + } else { + // iterate more + root = _clusterTopo.clone(zone, newNodeWeight, completedNodes); + } + + partitionsToAssign.clear(); + } + + Map<String, Node> partitionMap = new HashMap<String, Node>(); + for (Map.Entry<Node, List<String>> e : nodePartitionsMap.entrySet()) { + Node n = e.getKey(); + List<String> assigned = e.getValue(); + for (String p : assigned) { + partitionMap.put(p, n); + } + } + + return partitionMap; + } + + /** + * Number of retries for finding an appropriate instance for a replica. + */ + private static final int MAX_RETRY = 100; + private final JenkinsHash hashFun = new JenkinsHash(); + private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm(); + + /** + * For given input, select a number of children with given type. + * The caller will either get the expected number of selected nodes as a result, + * or an exception will be thrown. + */ + private List<Node> select(Node topNode, String nodeType, long data, int rf) + throws HelixException { + List<Node> zones = new ArrayList<Node>(); + long input = data; + int count = rf; + int tries = 0; + while (zones.size() < rf) { + List<Node> selected = placementAlgorithm + .select(topNode, input, rf, nodeType, nodeAlreadySelected(new HashSet<Node>(zones))); + // add the racks to the selected racks + zones.addAll(selected); + count = rf - zones.size(); + if (count > 0) { + input = hashFun.hash(input); // create a different hash value for retrying + tries++; + if (tries >= MAX_RETRY) { + throw new HelixException( + String.format("could not find all mappings after %d tries", tries)); + } + } + } + return zones; + } + + /** + * Use the predicate to reject already selected zones or nodes. + */ + private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) { + return Predicates.not(Predicates.in(selectedNodes)); + } + + /** + * Counts the total number of replicas given a state-count mapping + * @return + */ + private int countStateReplicas(Map<String, Integer> stateCountMap) { + int total = 0; + for (Integer count : stateCountMap.values()) { + total += count; + } + return total; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/0f7c3e42/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 1057fad..1d34cbd 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; @@ -141,6 +142,60 @@ public class Topology { } /** + * Returns all leaf nodes that belong in the tree. Returns itself if this node is a leaf. + * + * @return + */ + public static List<Node> getAllLeafNodes(Node root) { + List<Node> nodes = new ArrayList<Node>(); + if (root.isLeaf()) { + nodes.add(root); + } else { + for (Node child : root.getChildren()) { + nodes.addAll(getAllLeafNodes(child)); + } + } + return nodes; + } + + /** + * Clone a node tree structure, with node weight updated using specified new weight, + * and all nodes in @failedNodes as failed. + * + * @param root origin root of the tree + * @param newNodeWeight map of node name to its new weight. If absent, keep its original weight. + * @param failedNodes set of nodes that need to be failed. + * @return new root node. + */ + public static Node clone(Node root, Map<String, Integer> newNodeWeight, Set<String> failedNodes) { + Node newRoot = cloneTree(root, newNodeWeight, failedNodes); + computeWeight(newRoot); + return newRoot; + } + + private static Node cloneTree(Node root, Map<String, Integer> newNodeWeight, Set<String> failedNodes) { + Node newRoot = new Node(root); + if (newNodeWeight.containsKey(root.getName())) { + newRoot.setWeight(newNodeWeight.get(root.getName())); + } + if (failedNodes.contains(root.getName())) { + newRoot.setFailed(true); + newRoot.setWeight(0); + } + + List<Node> children = root.getChildren(); + if (children != null) { + for (int i = 0; i < children.size(); i++) { + Node newChild = cloneTree(children.get(i), newNodeWeight, failedNodes); + newChild.setParent(root); + newRoot.addChild(newChild); + } + } + + return newRoot; + } + + /** * Creates a tree representing the cluster structure using default cluster topology definition * (i,e no topology definition given and no domain id set). */ @@ -172,7 +227,7 @@ public class Topology { if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { weight = DEFAULT_NODE_WEIGHT; } - addEndNode(root, ins, pathValueMap, weight, _liveInstances); + root = addEndNode(root, ins, pathValueMap, weight, _liveInstances); } return root; @@ -280,6 +335,16 @@ public class Topology { return bstrTo32bit(h); } + private static void computeWeight(Node node) { + int weight = 0; + for (Node child : node.getChildren()) { + if (!child.isFailed()) { + weight += child.getWeight(); + } + } + node.setWeight(weight); + } + private long bstrTo32bit(byte[] bstr) { if (bstr.length < 4) { throw new IllegalArgumentException("hashed is less than 4 bytes!"); http://git-wip-us.apache.org/repos/asf/helix/blob/0f7c3e42/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java index 00a6169..026db1c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java @@ -19,6 +19,7 @@ package org.apache.helix.integration; * under the License. */ import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -98,7 +99,9 @@ public class TestCrushAutoRebalance extends ZkIntegrationTestBase { @DataProvider(name = "rebalanceStrategies") public static String [][] rebalanceStrategies() { - return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}}; + return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}, + {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()} + }; } @Test(dataProvider = "rebalanceStrategies")