zpinto commented on code in PR #2604:
URL: https://github.com/apache/helix/pull/2604#discussion_r1302235302


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -103,16 +104,16 @@ public static Set<String> getActiveNodes(Set<String> 
allNodes, IdealState idealS
       Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, 
Set<String> liveNodes,
       Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig 
clusterConfig) {
     if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
-      return new HashSet<>(liveEnabledNodes);
+      return filterEvacuatingInstances(instanceConfigMap, liveEnabledNodes);

Review Comment:
   same comment as above



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -128,6 +129,14 @@ private static Set<String> getActiveNodes(Set<String> 
allNodes, Set<String> live
     return activeNodes;
   }
 
+  private static Set<String> filterEvacuatingInstances(Map<String, 
InstanceConfig> instanceConfigMap,
+      Set<String> liveEnabledNodes) {

Review Comment:
   Would recommend naming variable something more generic than 
`liveEnabledNodes`(maybe just `nodes`) as we could use this method somewhere 
else in the class later to filter out evacuating nodes. We can't guarantee nor 
should we guarantee that this method is always used with live and enabled nodes.



##########
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java:
##########
@@ -0,0 +1,421 @@
+package org.apache.helix.integration.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestInstanceOperation extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private int REPLICA = 3;
+  protected ClusterControllerManager _controller;
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _participantNames = new ArrayList<>();
+  private Set<String> _allDBs = new HashSet<>();
+  private ZkHelixClusterVerifier _clusterVerifier;
+  private ConfigAccessor _configAccessor;
+  private long _stateModelDelay = 30L;
+  protected AssignmentMetadataStore _assignmentMetadataStore;
+  HelixDataAccessor _dataAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addParticipant(participantName);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+    _clusterVerifier = new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setDeactivatedNodeAwareness(true)
+        .setResources(_allDBs)
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    createTestDBs(200);
+
+    setUpWagedBaseline();
+  }
+
+  @Test
+  public void testEvacuate() throws Exception {
+    // EV should contain all participants, check resources one by one
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+    }
+
+    // cancel an evacuated instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.EVACUATE);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // New ev should contain all instances but the evacuated one
+    assignment = getEV();
+    List<String> currentActiveInstances =
+        _participantNames.stream().filter(n -> 
!n.equals(instanceToEvacuate)).collect(Collectors.toList());
+    for (String resource : _allDBs) {
+      validateAssignmentInEv(assignment.get(resource));
+      Set<String> newPAssignedParticipants = 
getParticipantsInEv(assignment.get(resource));
+      
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
+      
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvacuate")
+  public void testRevertEvacuation() throws Exception {
+
+    // evacuate an instance

Review Comment:
   Should this one be "revert an evacuation"?



##########
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java:
##########
@@ -0,0 +1,421 @@
+package org.apache.helix.integration.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestInstanceOperation extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private int REPLICA = 3;
+  protected ClusterControllerManager _controller;
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _participantNames = new ArrayList<>();
+  private Set<String> _allDBs = new HashSet<>();
+  private ZkHelixClusterVerifier _clusterVerifier;
+  private ConfigAccessor _configAccessor;
+  private long _stateModelDelay = 30L;
+  protected AssignmentMetadataStore _assignmentMetadataStore;
+  HelixDataAccessor _dataAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addParticipant(participantName);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+    _clusterVerifier = new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setDeactivatedNodeAwareness(true)
+        .setResources(_allDBs)
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    createTestDBs(200);
+
+    setUpWagedBaseline();
+  }
+
+  @Test
+  public void testEvacuate() throws Exception {
+    // EV should contain all participants, check resources one by one
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+    }
+
+    // cancel an evacuated instance

Review Comment:
   Why does this comment say "cancel"?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -89,7 +90,7 @@ public static Set<String> getActiveNodes(Set<String> 
allNodes, Set<String> liveE
       Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
       Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
     if (!isDelayRebalanceEnabled(clusterConfig)) {
-      return new HashSet<>(liveEnabledNodes);
+      return filterEvacuatingInstances(instanceConfigMap, liveEnabledNodes);

Review Comment:
   Please update the docstring to say that "evacuating" nodes are not 
considered active.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -128,6 +129,14 @@ private static Set<String> getActiveNodes(Set<String> 
allNodes, Set<String> live
     return activeNodes;
   }
 
+  private static Set<String> filterEvacuatingInstances(Map<String, 
InstanceConfig> instanceConfigMap,

Review Comment:
   Would recommend renaming to `filterOutEvacuatingInstances`, as 
`filterEvacuatingInstances` sounds like it should be returning the EVACUATE 
nodes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to