zpinto commented on code in PR #2877: URL: https://github.com/apache/helix/pull/2877#discussion_r1727789514
########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java: ########## @@ -82,7 +84,7 @@ public void init(String resourceName, final List<String> partitions, @Override public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) { - int numReplicas = countStateReplicas(); + int numReplicas = calculateStatesReplicaCount(); Review Comment: Why aren't we using `calculateExpectedReplicaCount` instead? ########## helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java: ########## @@ -866,4 +907,220 @@ public void testWontMoveSinglePartitionUnnecessarily() { // finally, make sure we haven't moved it. Assert.assertEquals(finalPreferredNode, otherNode); } + + @Test + public void testAutoRebalanceStrategyWorkWithDisabledInstances() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = {"resource_0", "resource_1", "resource_2"}; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + final int REPLICA_COUNT = 2; + final String[] NODES = {"n0", "n1"}; + + ResourceControllerDataProvider dataCache = buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + + // initial state, 2 nodes, no mapping + List<String> allNodes = Lists.newArrayList(NODES[0], NODES[1]); + List<String> liveNodes = Lists.newArrayList(NODES[0], NODES[1]); + Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap<String, String>()); + } + + // make sure that when the first node joins, a single replica is assigned fairly + List<String> partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap<String, Integer> stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + Map<String, List<String>> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + } + + // now disable node 1, and make sure that it is not in the preference list + allNodes = new ArrayList<>(allNodes); + liveNodes = new ArrayList<>(liveNodes); + liveNodes.remove(NODES[0]); + for (String partition : PARTITIONS) { + Map<String, String> idealStateMap = znRecord.getMapField(partition); + currentMapping.put(partition, idealStateMap); + } + + stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), 1); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + // make sure the master is transferred to the other node + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); + // Since node 0 is disabled, node 1 should be the only node in the preference list and it + // should be in the top state for every partition + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[1]), + "invalid preference list for " + partition); + Assert.assertEquals(znRecord.getMapField(partition).get(NODES[1]), STATE_MODEL.getTopState()); + } + } + + @Test + public void testAutoRebalanceStrategyWorkWithDisabledButActiveInstances() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = {"resource_0", "resource_1", "resource_2"}; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + final int REPLICA_COUNT = 2; + final String[] NODES = {"n0", "n1"}; + + ResourceControllerDataProvider dataCache = buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + // initial state, 2 node, no mapping + List<String> allNodes = Lists.newArrayList(NODES[0], NODES[1]); + List<String> liveNodes = Lists.newArrayList(NODES[0], NODES[1]); + Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap<String, String>()); + } + + // make sure that when nodes join, all partitions is assigned fairly + List<String> partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap<String, Integer> stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + Map<String, List<String>> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + } + + // now disable node 0, and make sure the dataCache provides it + for (String partition : PARTITIONS) { + Map<String, String> idealStateMap = znRecord.getMapField(partition); + currentMapping.put(partition, idealStateMap); + } + dataCache = buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Sets.newHashSet(NODES[0])); + + stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), 2); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + // make sure the size is equal to the number of active nodes + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + // Since node 0 is disabled but active, it should appear in the preference list and it should + // be OFFLINE state for the all partitions. Since there are only 2 nodes, node 1 should be the + // top state for every partition + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[1]), + "invalid preference list for " + partition); + Assert.assertEquals(znRecord.getMapField(partition).get(NODES[1]), STATE_MODEL.getTopState()); + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[0]), + "invalid preference list for " + partition); + Assert.assertEquals(znRecord.getMapField(partition).get(NODES[0]), + STATE_MODEL.getInitialState()); + } + } + + @Test + public void testRebalanceWithErrorPartition() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = {"resource_0", "resource_1", "resource_2"}; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + final int REPLICA_COUNT = 2; + final String[] NODES = {"n0", "n1", "n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9"}; + + ResourceControllerDataProvider dataCache = buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + // initial state, 10 node, no mapping + List<String> allNodes = Lists.newArrayList(NODES); + List<String> liveNodes = Lists.newArrayList(NODES); + Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap<String, String>()); + } + + // make sure that when nodes join, all partitions is assigned fairly + List<String> partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap<String, Integer> stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), allNodes.size()); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + Map<String, List<String>> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), allNodes.size(), + "invalid preference list for " + partition); + } + + // Suppose that one replica of partition 0 is in n0, and it has been in the ERROR state. + + for (String partition : PARTITIONS) { + Map<String, String> idealStateMap = znRecord.getMapField(partition); + currentMapping.put(partition, idealStateMap); + } + currentMapping.get(PARTITIONS[0]).put(NODES[0], "ERROR"); + // now disable node 0, and make sure the dataCache provides it. And add another node n10 to the + // cluster. We want to make sure the n10 can pick up another replica of partition 0,1,2. + allNodes = new ArrayList<>(allNodes); + liveNodes = new ArrayList<>(liveNodes); + liveNodes.remove(NODES[0]); + allNodes.add("n10"); Review Comment: Before we add another node, can we also check that the we don't drop a partition on any of the remaining nodes after disabling the one with the error replica? ########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java: ########## @@ -127,15 +129,16 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li _stateMap = generateStateMap(); // compute the preferred mapping if all nodes were up - _preferredAssignment = computePreferredPlacement(sortedAllNodes); + _preferredAssignment = computePreferredPlacement(sortedAllNodes, clusterData); // logger.info("preferred mapping:"+ preferredAssignment); Review Comment: Can we remove this line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org