desaikomal commented on code in PR #2578: URL: https://github.com/apache/helix/pull/2578#discussion_r1276858272
########## helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java: ########## @@ -0,0 +1,277 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional warnrmation + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBucketDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.participant.StateMachineEngine; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * This test case is specially targeting for n - n+1 issue. + * + * Initially, all the partitions are equal size and equal weight + * from both CU, DISK. + * All the nodes are equally loaded. + * The test case will do the following: + * For one resource, we will double its CU weight. + * The rebalancer will be triggered. + * + * We have a monitoring thread which is constantly monitoring the instance capacity. + * - It looks at current state resource assignment + pending messages + * - it has ASSERT in place to make sure we NEVER cross instance capacity (CU) + */ +public class TestWagedLoadedCluster extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 13000; + protected static final int PARTITIONS = 10; + + protected static final String CLASS_NAME = TestWagedLoadedCluster.class.getSimpleName(); + protected static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + protected AssignmentMetadataStore _assignmentMetadataStore; + List<MockParticipantManager> _participants = new ArrayList<>(); + List<String> _nodes = new ArrayList<>(); + private final Set<String> _allDBs = new HashSet<>(); + + private CountDownLatch _completedTest = new CountDownLatch(1); + private CountDownLatch _weightUpdatedLatch = new CountDownLatch(1); // when 0, weight is updated + private final Map<String, Integer> _defaultInstanceCapacity = + ImmutableMap.of("CU", 50, "DISK", 50); + + private final Map<String, Integer> _defaultPartitionWeight = + ImmutableMap.of("CU", 10, "DISK", 10); + + private final Map<String, Integer> _newPartitionWeight = + ImmutableMap.of("CU", 20, "DISK", 10); + private final int DEFAULT_DELAY = 500; // 0.5 second + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + @BeforeClass + public void beforeClass() throws Exception { + LOG.info("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); Review Comment: I wil do as a follow up change. thanks ########## helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedLoadedCluster.java: ########## @@ -0,0 +1,277 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional warnrmation + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBucketDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.participant.StateMachineEngine; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * This test case is specially targeting for n - n+1 issue. + * + * Initially, all the partitions are equal size and equal weight + * from both CU, DISK. + * All the nodes are equally loaded. + * The test case will do the following: + * For one resource, we will double its CU weight. + * The rebalancer will be triggered. + * + * We have a monitoring thread which is constantly monitoring the instance capacity. + * - It looks at current state resource assignment + pending messages + * - it has ASSERT in place to make sure we NEVER cross instance capacity (CU) + */ +public class TestWagedLoadedCluster extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 13000; + protected static final int PARTITIONS = 10; + + protected static final String CLASS_NAME = TestWagedLoadedCluster.class.getSimpleName(); + protected static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + protected AssignmentMetadataStore _assignmentMetadataStore; + List<MockParticipantManager> _participants = new ArrayList<>(); + List<String> _nodes = new ArrayList<>(); + private final Set<String> _allDBs = new HashSet<>(); + + private CountDownLatch _completedTest = new CountDownLatch(1); + private CountDownLatch _weightUpdatedLatch = new CountDownLatch(1); // when 0, weight is updated + private final Map<String, Integer> _defaultInstanceCapacity = + ImmutableMap.of("CU", 50, "DISK", 50); + + private final Map<String, Integer> _defaultPartitionWeight = + ImmutableMap.of("CU", 10, "DISK", 10); + + private final Map<String, Integer> _newPartitionWeight = + ImmutableMap.of("CU", 20, "DISK", 10); + private final int DEFAULT_DELAY = 500; // 0.5 second + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + @BeforeClass + public void beforeClass() throws Exception { + LOG.info("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + _gSetupTool.addCluster(CLUSTER_NAME, true); + // create 6 node cluster + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _nodes.add(storageNodeName); + } + // ST downward message will get delayed by 5sec. + startParticipants(DEFAULT_DELAY); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + _assignmentMetadataStore = + new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) { + public Map<String, ResourceAssignment> getBaseline() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBaseline(); + } + + public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBestPossibleAssignment(); + } + }; + + // Set test instance capacity and partition weights + HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + ClusterConfig clusterConfig = + dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig()); + + clusterConfig.setDefaultInstanceCapacityMap(_defaultInstanceCapacity); + clusterConfig.setDefaultPartitionWeightMap(_defaultPartitionWeight); + dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), clusterConfig); + + // Create 3 resources with 2 partitions each. + for (int i = 0; i < 3; i++) { + String db = "Test-WagedDB-" + i; + createResourceWithWagedRebalance(CLUSTER_NAME, db, BuiltInStateModelDefinitions.MasterSlave.name(), + 2 /*numPartitions*/, 3 /*replicas*/, 3 /*minActiveReplicas*/); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, 3); + _allDBs.add(db); + } + + // Start a thread which will keep validating instance usage using currentState and pending messages. + Thread validateInstanceUsageThread = new Thread(new Runnable() { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
