http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceNonRack.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceNonRack.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceNonRack.java deleted file mode 100644 index c2a165e..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceNonRack.java +++ /dev/null @@ -1,278 +0,0 @@ -package org.apache.helix.integration.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.integration.common.ZkStandAloneCMTestBase; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.BuiltInStateModelDefinitions; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { - final int NUM_NODE = 6; - protected static final int START_PORT = 12918; - protected static final int _PARTITIONS = 20; - - protected final String CLASS_NAME = getShortClassName(); - protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; - protected ClusterControllerManager _controller; - - protected ClusterSetup _setupTool = null; - List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); - Map<String, String> _nodeToTagMap = new HashMap<String, String>(); - List<String> _nodes = new ArrayList<String>(); - Set<String> _allDBs = new HashSet<String>(); - int _replica = 3; - - private static String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), - BuiltInStateModelDefinitions.MasterSlave.name(), - BuiltInStateModelDefinitions.LeaderStandby.name() - }; - - @BeforeClass - public void beforeClass() throws Exception { - System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursive(namespace); - } - _setupTool = new ClusterSetup(_gZkClient); - _setupTool.addCluster(CLUSTER_NAME, true); - - ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); - ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); - clusterConfig.setTopology("/instance"); - clusterConfig.setFaultZoneType("instance"); - configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - - for (int i = 0; i < NUM_NODE; i++) { - String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); - _nodes.add(storageNodeName); - String tag = "tag-" + i % 2; - _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); - _nodeToTagMap.put(storageNodeName, tag); - InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); - instanceConfig.setDomain("instance=" + storageNodeName); - configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig); - } - - // start dummy participants - for (String node : _nodes) { - MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); - participant.syncStart(); - _participants.add(participant); - } - - // start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - //enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); - } - - @DataProvider(name = "rebalanceStrategies") public static String[][] rebalanceStrategies() { - return new String[][] { { "CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName() } }; - } - - @Test(dataProvider = "rebalanceStrategies", enabled = true) - public void test(String rebalanceStrategyName, String rebalanceStrategyClass) - throws Exception { - System.out.println("Test " + rebalanceStrategyName); - int i = 0; - for (String stateModel : _testModels) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, - RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify(5000)); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateIsolation(is, ev, _replica); - } - } - - @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = "test") - public void testWithInstanceTag(String rebalanceStrategyName, String rebalanceStrategyClass) - throws Exception { - Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); - int i = 3; - for (String tag : tags) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, - BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", - rebalanceStrategyClass); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - is.setInstanceGroupTag(tag); - _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify(5000)); - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateIsolation(is, ev, _replica); - } - } - - @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { "test", - "testWithInstanceTag"}) - public void testLackEnoughLiveInstances(String rebalanceStrategyName, - String rebalanceStrategyClass) throws Exception { - System.out.println("TestLackEnoughInstances " + rebalanceStrategyName); - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - // shutdown participants, keep only two left - for (int i = 2; i < _participants.size(); i++) { - _participants.get(i).syncStop(); - } - - int i = 0; - for (String stateModel : _testModels) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, - RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify(5000)); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateIsolation(is, ev, 2); - } - } - - @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { "test", - "testWithInstanceTag"}) - public void testLackEnoughInstances(String rebalanceStrategyName, - String rebalanceStrategyClass) throws Exception { - System.out.println("TestLackEnoughInstances " + rebalanceStrategyName); - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - // shutdown participants, keep only two left - for (int i = 2; i < _participants.size(); i++) { - MockParticipantManager p = _participants.get(i); - p.syncStop(); - _setupTool.getClusterManagementTool() - .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); - Thread.sleep(50); - _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); - } - - int i = 0; - for (String stateModel : _testModels) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; - _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, - RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); - _allDBs.add(db); - } - Thread.sleep(300); - - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setResources(_allDBs).build(); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _allDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateIsolation(is, ev, 2); - } - } - - /** - * Validate each partition is different instances and with necessary tagged instances. - */ - private void validateIsolation(IdealState is, ExternalView ev, int expectedReplica) { - String tag = is.getInstanceGroupTag(); - for (String partition : is.getPartitionSet()) { - Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); - Set<String> instancesInEV = assignmentMap.keySet(); - Assert.assertEquals(instancesInEV.size(), expectedReplica); - for (String instance : instancesInEV) { - if (tag != null) { - InstanceConfig config = - _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); - Assert.assertTrue(config.containsTag(tag)); - } - } - } - } - - @AfterMethod public void afterMethod() throws Exception { - for (String db : _allDBs) { - _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); - } - _allDBs.clear(); - // waiting for all DB be dropped. - Thread.sleep(200); - } -}
http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceTopoplogyAwareDisabled.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceTopoplogyAwareDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceTopoplogyAwareDisabled.java deleted file mode 100644 index b5bd9d7..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalanceTopoplogyAwareDisabled.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.helix.integration.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Date; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.tools.ClusterSetup; -import org.testng.annotations.BeforeClass; - -public class TestCrushAutoRebalanceTopoplogyAwareDisabled extends TestCrushAutoRebalanceNonRack { - @BeforeClass - public void beforeClass() throws Exception { - System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursive(namespace); - } - _setupTool = new ClusterSetup(_gZkClient); - _setupTool.addCluster(CLUSTER_NAME, true); - - for (int i = 0; i < NUM_NODE; i++) { - String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); - _nodes.add(storageNodeName); - String tag = "tag-" + i % 2; - _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); - _nodeToTagMap.put(storageNodeName, tag); - } - - // start dummy participants - for (String node : _nodes) { - MockParticipantManager participant = - new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); - participant.syncStart(); - _participants.add(participant); - } - - // start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java deleted file mode 100644 index c3852a4..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalance.java +++ /dev/null @@ -1,342 +0,0 @@ -package org.apache.helix.integration.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; -import org.apache.helix.integration.common.ZkIntegrationTestBase; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.BuiltInStateModelDefinitions; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; -import org.omg.PortableServer.THREAD_POLICY_ID; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -public class TestDelayedAutoRebalance extends ZkIntegrationTestBase { - final int NUM_NODE = 5; - protected static final int START_PORT = 12918; - protected static final int _PARTITIONS = 5; - - protected final String CLASS_NAME = getShortClassName(); - protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; - protected ClusterControllerManager _controller; - - protected ClusterSetup _setupTool = null; - List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); - int _replica = 3; - int _minActiveReplica = _replica - 1; - HelixClusterVerifier _clusterVerifier; - List<String> _testDBs = new ArrayList<String>(); - - @BeforeClass - public void beforeClass() throws Exception { - System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursive(namespace); - } - _setupTool = new ClusterSetup(_gZkClient); - _setupTool.addCluster(CLUSTER_NAME, true); - - for (int i = 0; i < NUM_NODE; i++) { - String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); - - // start dummy participants - MockParticipantManager participant = - new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); - participant.syncStart(); - _participants.add(participant); - } - - // start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); - - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - } - - protected String[] TestStateModels = { - BuiltInStateModelDefinitions.MasterSlave.name(), - BuiltInStateModelDefinitions.OnlineOffline.name(), - BuiltInStateModelDefinitions.LeaderStandby.name() - }; - - /** - * The partition movement should be delayed (not happen immediately) after one single node goes offline. - * Delay is enabled by default, delay time is set in IdealState. - * @throws Exception - */ - @Test - public void testDelayedPartitionMovement() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - validateDelayedMovements(externalViewsBefore); - } - - @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) - public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); - - Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); - validateDelayedMovements(externalViewsBefore); - - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); - } - - /** - * Test when two nodes go offline, the minimal active replica should be maintained. - * @throws Exception - */ - @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) - public void testMinimalActiveReplicaMaintain() throws Exception { - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); - Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); - validateDelayedMovements(externalViewsBefore); - - // bring down another node, the minimal active replica for each partition should be maintained. - _participants.get(3).syncStop(); - Thread.sleep(500); - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - } - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); - } - - /** - * The partititon should be moved to other nodes after the delay time - */ - @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) - public void testPartitionMovementAfterDelayTime() throws Exception { - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - long delay = 4000; - Map<String, ExternalView> externalViewsBefore = createTestDBs(delay); - validateDelayedMovements(externalViewsBefore); - - Thread.sleep(delay + 200); - Assert.assertTrue(_clusterVerifier.verify()); - // after delay time, it should maintain required number of replicas. - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); - } - } - - @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) - public void testDisableDelayRebalanceInResource() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - validateDelayedMovements(externalViewsBefore); - - // disable delay rebalance for one db, partition should be moved immediately - String testDb = _testDBs.get(0); - IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState( - CLUSTER_NAME, testDb); - idealState.setDelayRebalanceEnabled(false); - _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); - Thread.sleep(1000); - - - // once delay rebalance is disabled, it should maintain required number of replicas for that db. - // replica for other dbs should not be moved. - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = - _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - - if (db.equals(testDb)) { - validateMinActiveAndTopStateReplica(idealState, ev, _replica, NUM_NODE); - } else { - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), false); - } - } - } - - @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"}) - public void testDisableDelayRebalanceInCluster() throws Exception { - enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); - - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - validateDelayedMovements(externalViewsBefore); - - // disable delay rebalance for the entire cluster. - enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); - // TODO: remove this once controller is listening on cluster config change. - RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), _testDBs.get(0)); - Thread.sleep(500); - Assert.assertTrue(_clusterVerifier.verify()); - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); - } - - enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); - } - - @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"}) - public void testDisableDelayRebalanceInInstance() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - validateDelayedMovements(externalViewsBefore); - - String disabledInstanceName = _participants.get(0).getInstanceName(); - enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, false); - Thread.sleep(1000); - - for (String db : _testDBs) { - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - Map<String, List<String>> preferenceLists = is.getPreferenceLists(); - for (List<String> instances : preferenceLists.values()) { - Assert.assertFalse(instances.contains(disabledInstanceName)); - } - } - enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, true); - } - - @AfterMethod - public void afterTest() throws InterruptedException { - // delete all DBs create in last test - for (String db : _testDBs) { - _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); - } - _testDBs.clear(); - Thread.sleep(50); - } - - @BeforeMethod - public void beforeTest() { - // restart any participant that has been disconnected from last test. - for (int i = 0; i < _participants.size(); i++) { - if (!_participants.get(i).isConnected()) { - _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, - _participants.get(i).getInstanceName())); - _participants.get(i).syncStart(); - } - } - } - - // create test DBs, wait it converged and return externalviews - protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException { - Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); - int i = 0; - for (String stateModel : TestStateModels) { - String db = "Test-DB-" + i++; - createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, - _minActiveReplica, delayTime); - _testDBs.add(db); - } - Thread.sleep(800); - Assert.assertTrue(_clusterVerifier.verify()); - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - externalViews.put(db, ev); - } - return externalViews; - } - - protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, ExternalView evAfter, - String offlineInstance, boolean disabled) { - for (String partition : is.getPartitionSet()) { - Map<String, String> assignmentsBefore = evBefore.getRecord().getMapField(partition); - Map<String, String> assignmentsAfter = evAfter.getRecord().getMapField(partition); - - Set<String> instancesBefore = new HashSet<String>(assignmentsBefore.keySet()); - Set<String> instancesAfter = new HashSet<String>(assignmentsAfter.keySet()); - - if (disabled) { - // the offline node is disabled - Assert.assertEquals(instancesBefore, instancesAfter, String - .format("%s has been moved to new instances, before: %s, after: %s, disabled instance:", - partition, assignmentsBefore.toString(), assignmentsAfter.toString(), - offlineInstance)); - - if (instancesAfter.contains(offlineInstance)) { - Assert.assertEquals(assignmentsAfter.get(offlineInstance), "OFFLINE"); - } - } else { - // the offline node actually went offline. - instancesBefore.remove(offlineInstance); - Assert.assertEquals(instancesBefore, instancesAfter, String - .format("%s has been moved to new instances, before: %s, after: %s, offline instance:", - partition, assignmentsBefore.toString(), assignmentsAfter.toString(), - offlineInstance)); - } - } - } - - private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore) - throws InterruptedException { - // bring down one node, no partition should be moved. - _participants.get(0).syncStop(); - Thread.sleep(500); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), false); - } - } - - @AfterClass - public void afterClass() throws Exception { - /** - * shutdown order: 1) disconnect the controller 2) disconnect participants - */ - _controller.syncStop(); - for (MockParticipantManager participant : _participants) { - participant.syncStop(); - } - _setupTool.deleteCluster(CLUSTER_NAME); - System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java deleted file mode 100644 index 5df2563..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java +++ /dev/null @@ -1,317 +0,0 @@ -package org.apache.helix.integration.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAutoRebalance { - private ConfigAccessor _configAccessor; - - @BeforeClass - public void beforeClass() throws Exception { - super.beforeClass(); - _configAccessor = new ConfigAccessor(_gZkClient); - } - - - /** - * The partition movement should be delayed (not happen immediately) after one single node is disabled. - * Delay is enabled by default, delay time is set in IdealState. - * @throws Exception - */ - @Test - @Override - public void testDelayedPartitionMovement() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - - // Disable one node, no partition should be moved. - String instance = _participants.get(0).getInstanceName(); - enableInstance(instance, false); - - Thread.sleep(300); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, instance, true); - } - } - - @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) - @Override - public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception { - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); - - Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); - - // Disable one node, no partition should be moved. - String instance = _participants.get(0).getInstanceName(); - enableInstance(instance, false); - - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); - } - - /** - * Test when two nodes were disabled, the minimal active replica should be maintained. - * @throws Exception - */ - @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) - @Override - public void testMinimalActiveReplicaMaintain() throws Exception { - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); - Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); - - // disable one node, no partition should be moved. - enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - - // disable another node, the minimal active replica for each partition should be maintained. - enableInstance(_participants.get(3).getInstanceName(), false); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - } - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); - } - - /** - * Test when one node is disable while another node is offline, the minimal active replica should be maintained. - * @throws Exception - */ - @Test(dependsOnMethods = {"testDelayedPartitionMovement"}) - public void testMinimalActiveReplicaMaintainWithOneOffline() throws Exception { - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); - Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); - - // disable one node, no partition should be moved. - enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - - // bring down another node, the minimal active replica for each partition should be maintained. - _participants.get(3).syncStop(); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - } - setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); - } - - /** - * The partititon should be moved to other nodes after the delay time - */ - @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) - @Override - public void testPartitionMovementAfterDelayTime() throws Exception { - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - long delay = 10000; - Map<String, ExternalView> externalViewsBefore = createTestDBs(delay); - - // disable one node, no partition should be moved. - enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - - Thread.sleep(delay + 200); - // after delay time, it should maintain required number of replicas. - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); - } - } - - @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"}) - @Override - public void testDisableDelayRebalanceInResource() throws Exception { - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - - // disable one node, no partition should be moved. - enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - - // disable delay rebalance for one db, partition should be moved immediately - String testDb = _testDBs.get(0); - IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState( - CLUSTER_NAME, testDb); - idealState.setDelayRebalanceEnabled(false); - _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState); - Thread.sleep(2000); - Assert.assertTrue(_clusterVerifier.verify()); - - // once delay rebalance is disabled, it should maintain required number of replicas for that db. - // replica for other dbs should not be moved. - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = - _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - - if (db.equals(testDb)) { - validateMinActiveAndTopStateReplica(idealState, ev, _replica, NUM_NODE); - } else { - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - } - } - - @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"}) - @Override - public void testDisableDelayRebalanceInCluster() throws Exception { - enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); - - Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000); - - // disable one node, no partition should be moved. - enableInstance(_participants.get(0).getInstanceName(), false); - Thread.sleep(100); - Assert.assertTrue(_clusterVerifier.verify()); - - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); - validateNoPartitionMove(is, externalViewsBefore.get(db), ev, - _participants.get(0).getInstanceName(), true); - } - - // disable delay rebalance for the entire cluster. - enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); - // TODO: remove this once controller is listening on cluster config change. - RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), _testDBs.get(0)); - Thread.sleep(500); - Assert.assertTrue(_clusterVerifier.verify()); - for (String db : _testDBs) { - ExternalView ev = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState( - CLUSTER_NAME, db); - validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); - } - - enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); - } - - @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"}) - public void testDisableDelayRebalanceInInstance() throws Exception { - super.testDisableDelayRebalanceInInstance(); - } - - @BeforeMethod - public void beforeTest() { - // restart any participant that has been disconnected from last test. - for (int i = 0; i < _participants.size(); i++) { - if (!_participants.get(i).isConnected()) { - _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, - _participants.get(i).getInstanceName())); - _participants.get(i).syncStart(); - } - enableInstance(_participants.get(i).getInstanceName(), true); - } - } - - private void enableInstance(String instance, boolean enabled) { - // Disable one node, no partition should be moved. - long currentTime = System.currentTimeMillis(); - _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled); - InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance); - Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled); - Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime); - Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 100); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithRackaware.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithRackaware.java deleted file mode 100644 index 0f6c639..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestDelayedAutoRebalanceWithRackaware.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.apache.helix.integration.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Date; -import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.IdealState; -import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.testng.annotations.BeforeClass; - -public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance { - final int NUM_NODE = 9; - - @BeforeClass - public void beforeClass() throws Exception { - System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursive(namespace); - } - _setupTool = new ClusterSetup(_gZkClient); - _setupTool.addCluster(CLUSTER_NAME, true); - - for (int i = 0; i < NUM_NODE; i++) { - String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); - String zone = "zone-" + i % 3; - _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); - - // start dummy participants - MockParticipantManager participant = - new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); - participant.syncStart(); - _participants.add(participant); - } - - enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); - - // start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); - } - - @Override - protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, - String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { - return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, - minActiveReplica, delay, CrushRebalanceStrategy.class.getName()); - } -}