junkaixue commented on code in PR #2772:
URL: https://github.com/apache/helix/pull/2772#discussion_r1550872329


##########
helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java:
##########
@@ -321,29 +330,93 @@ public String getInstanceDisabledReason() {
    *         Default is am empty string.
    */
   public String getInstanceDisabledType() {
-    if (getInstanceEnabled()) {
+    if 
(!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
       return InstanceConstants.INSTANCE_NOT_DISABLED;
     }
     return 
_record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
         
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
   }
 
   /**
-   * Get the timestamp (milliseconds from epoch) when this instance was 
enabled/disabled last time.
+   * Set the instance operation for this instance.
    *
-   * @return
+   * @param operation the instance operation
    */
-  public long getInstanceEnabledTime() {
-    return 
_record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
-  }
-
   public void setInstanceOperation(InstanceConstants.InstanceOperation 
operation) {
     _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
         operation == null ? "" : operation.name());
+    if (operation == null || operation == 
InstanceConstants.InstanceOperation.ENABLE
+        || operation == InstanceConstants.InstanceOperation.DISABLE) {
+      // We are still setting the HELIX_ENABLED field for backwards 
compatibility.
+      // It is possible that users will be using earlier version of HelixAdmin 
or helix-rest
+      // is on older version.
+      // TODO: Remove this when we are sure that all users are using the new 
field INSTANCE_OPERATION.
+      setInstanceEnabledHelper(!(operation == 
InstanceConstants.InstanceOperation.DISABLE));
+    }
+  }
+
+  private void setInstanceOperationInit(InstanceConstants.InstanceOperation 
operation) {
+    if (operation == null) {
+      return;
+    }
+    _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), 
operation.name());
+  }
+
+  /**
+   * Get the InstanceOperation of this instance, default is ENABLE if nothing 
is set. If
+   * HELIX_ENABLED is set to false, then the instance operation is DISABLE for 
backwards
+   * compatibility.
+   *
+   * @return the instance operation
+   */
+  public InstanceConstants.InstanceOperation getInstanceOperation() {
+    String instanceOperationString =
+        
_record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name());
+
+    InstanceConstants.InstanceOperation instanceOperation;
+    try {
+      // If INSTANCE_OPERATION is not set, then the instance is enabled.
+      instanceOperation = instanceOperationString == null || 
instanceOperationString.isEmpty()

Review Comment:
   Need parenthesis for check null and empty?



##########
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java:
##########
@@ -509,62 +413,107 @@ public void enableInstance(String clusterName, 
List<String> instances, boolean e
     //enableInstance(clusterName, instances, enabled, null, null);
   }
 
+  private void validateInstanceOperationTransition(InstanceConfig 
instanceConfig,
+      InstanceConfig matchingLogicalIdInstance,
+      InstanceConstants.InstanceOperation currentOperation,
+      InstanceConstants.InstanceOperation targetOperation,
+      String clusterName) {
+    boolean targetStateEnableOrDisable =
+        targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE)
+            || 
targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE);
+    switch (currentOperation) {
+      case ENABLE:
+      case DISABLE:
+        // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any 
time.
+        if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE,
+            InstanceConstants.InstanceOperation.DISABLE,
+            
InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) {
+          return;
+        }
+      case SWAP_IN:
+        // We can only ENABLE or DISABLE a SWAP_IN instance if there is an 
instance with matching logicalId
+        // with an InstanceOperation set to UNKNOWN.
+        if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+            || matchingLogicalIdInstance.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.UNKNOWN))) || 
targetOperation.equals(
+            InstanceConstants.InstanceOperation.UNKNOWN)) {
+          return;
+        }
+      case EVACUATE:
+        // EVACUATE can only be set to ENABLE or DISABLE when there is no 
instance with the same
+        // logicalId in the cluster.
+        if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null)
+            || 
targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
+          return;
+        }
+      case UNKNOWN:
+        // UNKNOWN can be set to ENABLE or DISABLE when there is no instance 
with the same logicalId in the cluster
+        // or the instance with the same logicalId in the cluster has 
InstanceOperation set to EVACUATE.
+        // UNKNOWN can be set to SWAP_IN when there is an instance with the 
same logicalId in the cluster set to ENABLE,
+        // or DISABLE.
+        if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+            || matchingLogicalIdInstance.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.EVACUATE)))) {
+          return;
+        } else if 
(targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN)
+            && matchingLogicalIdInstance != null && !ImmutableSet.of(
+                InstanceConstants.InstanceOperation.UNKNOWN,
+                InstanceConstants.InstanceOperation.EVACUATE)
+            .contains(matchingLogicalIdInstance.getInstanceOperation())) {
+          // Get the topology key used to determine the logicalId of a node.
+          ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
+          ClusterTopologyConfig clusterTopologyConfig =
+              ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+          String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
+          // If the existing instance with the same logicalId is not in the 
same FAULT_ZONE as this instance, we cannot
+          // add this instance.
+          if 
(!matchingLogicalIdInstance.getDomainAsMap().containsKey(faultZoneKey)

Review Comment:
   This is too strong right? We should allow them to add instance with same 
logic id but different zone. Then the cluster should be able to rebalance 
instead of disallowing it.



##########
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java:
##########
@@ -131,82 +135,102 @@ public void process(ClusterEvent event) throws Exception 
{
     });
   }
 
+  private String selectSwapInState(StateModelDefinition stateModelDef, 
Map<String, String> stateMap,
+      String swapOutInstance) {
+    // If the swap-in node is live, select state with the following logic:
+    // 1. If the swap-out instance's replica is in the stateMap:
+    // - if the swap-out instance's replica is a topState, select the swap-in 
instance's replica to the topState.
+    //   if another is allowed to be added, otherwise select the swap-in 
instance's replica to a secondTopState.
+    // - if the swap-out instance's replica is not a topState or ERROR, select 
the swap-in instance's replica to the same state.
+    // - if the swap-out instance's replica is ERROR, select the swap-in 
instance's replica to the initialState.
+    // 2. If the swap-out instance's replica is not in the stateMap, select 
the swap-in instance's replica to the initialState.
+    // This happens when the swap-out node is offline.
+    if (stateMap.containsKey(swapOutInstance)) {
+      if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) || 
stateMap.get(
+          swapOutInstance).equals(HelixDefinedState.ERROR.name())) {
+        // If the swap-out instance's replica is a topState, select the 
swap-in instance's replica
+        // to be the topState if the StateModel allows another to be added. If 
not, select the swap-in
+        // to be the secondTopState.
+        String topStateCount = 
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+        if 
(topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+            || 
topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
+          // If the StateModel allows for another replica with the topState to 
be added,
+          // select the swap-in instance's replica to the topState.
+          return stateModelDef.getTopState();
+        } else {
+          // If StateModel does not allow another topState replica to be
+          // added, select the swap-in instance's replica to be the 
secondTopState.
+          return stateModelDef.getSecondTopStates().iterator().next();
+        }
+      } else {
+        // If the swap-out instance's replica is not a topState or ERROR, 
select the swap-in instance's replica
+        // to be the same state
+        return stateMap.get(swapOutInstance);
+      }
+    } else {
+      // If the swap-out instance's replica is not in the stateMap, select the 
swap-in instance's replica
+      // to be the initialState. This happens when the swap-out node is 
offline.
+      return stateModelDef.getInitialState();

Review Comment:
   This is confusing to me. If the instance is not state map, it suppose not be 
in preference list. Also, the original swap out one is already dropped. Then we 
should not even to put a state to that instance, I think?



##########
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java:
##########
@@ -131,82 +135,102 @@ public void process(ClusterEvent event) throws Exception 
{
     });
   }
 
+  private String selectSwapInState(StateModelDefinition stateModelDef, 
Map<String, String> stateMap,
+      String swapOutInstance) {
+    // If the swap-in node is live, select state with the following logic:
+    // 1. If the swap-out instance's replica is in the stateMap:
+    // - if the swap-out instance's replica is a topState, select the swap-in 
instance's replica to the topState.
+    //   if another is allowed to be added, otherwise select the swap-in 
instance's replica to a secondTopState.
+    // - if the swap-out instance's replica is not a topState or ERROR, select 
the swap-in instance's replica to the same state.
+    // - if the swap-out instance's replica is ERROR, select the swap-in 
instance's replica to the initialState.
+    // 2. If the swap-out instance's replica is not in the stateMap, select 
the swap-in instance's replica to the initialState.
+    // This happens when the swap-out node is offline.
+    if (stateMap.containsKey(swapOutInstance)) {
+      if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) || 
stateMap.get(
+          swapOutInstance).equals(HelixDefinedState.ERROR.name())) {
+        // If the swap-out instance's replica is a topState, select the 
swap-in instance's replica
+        // to be the topState if the StateModel allows another to be added. If 
not, select the swap-in
+        // to be the secondTopState.
+        String topStateCount = 
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+        if 
(topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+            || 
topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
+          // If the StateModel allows for another replica with the topState to 
be added,
+          // select the swap-in instance's replica to the topState.
+          return stateModelDef.getTopState();
+        } else {
+          // If StateModel does not allow another topState replica to be
+          // added, select the swap-in instance's replica to be the 
secondTopState.
+          return stateModelDef.getSecondTopStates().iterator().next();
+        }

Review Comment:
   Usually, if has a return statement, there is no need to specific the else 
clause.



##########
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java:
##########
@@ -206,113 +205,29 @@ public void addInstance(String clusterName, 
InstanceConfig instanceConfig) {
       throw new HelixException("Node " + nodeId + " already exists in cluster 
" + clusterName);
     }
 
-    if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains(
-        instanceConfig.getInstanceOperation())) {
+    List<InstanceConfig> matchingLogicalIdInstances =
+        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+    if (matchingLogicalIdInstances.size() > 1) {
       throw new HelixException(
-          "Instance can only be added if InstanceOperation is set to one of" + 
"the following: "
-              + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This 
instance: " + nodeId
-              + " has InstanceOperation set to " + 
instanceConfig.getInstanceOperation());
+          "There are already more than one instance with the same logicalId in 
the cluster: "
+              + 
matchingLogicalIdInstances.stream().map(InstanceConfig::getInstanceName)
+              .collect(Collectors.joining(", "))
+              + " Please make sure there is at most 2 instance with the same 
logicalId in the cluster.");
     }
 
-    // Get the topology key used to determine the logicalId of a node.
-    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
-    ClusterTopologyConfig clusterTopologyConfig =
-        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
-    String logicalIdKey = clusterTopologyConfig.getEndNodeType();
-    String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
-    String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey);
-
-    HelixConfigScope instanceConfigScope =
-        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
-            clusterName).build();
-    List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
-    List<InstanceConfig> foundInstanceConfigsWithMatchingLogicalId =
-        existingInstanceIds.parallelStream()
-            .map(existingInstanceId -> getInstanceConfig(clusterName, 
existingInstanceId)).filter(
-                existingInstanceConfig -> 
existingInstanceConfig.getLogicalId(logicalIdKey)
-                    
.equals(toAddInstanceLogicalId)).collect(Collectors.toList());
-
-    if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) {
-      // If the length is 2, we cannot add an instance with the same logicalId 
as an existing instance
-      // regardless of InstanceOperation.
-      throw new HelixException(
-          "There can only be 2 instances with the same logicalId in a cluster. 
"
-              + "Existing instances: " + 
foundInstanceConfigsWithMatchingLogicalId.get(0)
-              .getInstanceName() + " and " + 
foundInstanceConfigsWithMatchingLogicalId.get(1)
-              .getInstanceName() + " already have the same logicalId: " + 
toAddInstanceLogicalId
-              + "; therefore, " + nodeId + " cannot be added to the cluster.");
-    } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) {
-      // If there is only one instance with the same logicalId,
-      // we can infer that the intended behaviour is to SWAP_IN or EVACUATE + 
ADD.
-      if 
(foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
-        // If the existing instance with the same logicalId has SWAP_OUT 
InstanceOperation
-
-        // If the InstanceOperation is unset, we will set it to SWAP_IN.
-        if (!instanceConfig.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
-          
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
-        }
-
-        // If the existing instance with the same logicalId is not in the same 
FAULT_ZONE as this instance, we cannot
-        // add this instance.
-        if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap()
-            .containsKey(faultZoneKey) || 
!instanceConfig.getDomainAsMap().containsKey(faultZoneKey)
-            || 
!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey)
-            .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) {
-          throw new HelixException(
-              "Instance can only be added if the SWAP_OUT instance sharing the 
same logicalId is in the same FAULT_ZONE"
-                  + " as this instance. " + "Existing instance: "
-                  + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
-                  + " has FAULT_ZONE_TYPE: " + 
foundInstanceConfigsWithMatchingLogicalId.get(0)
-                  .getDomainAsMap().get(faultZoneKey) + " and this instance: " 
+ nodeId
-                  + " has FAULT_ZONE_TYPE: " + 
instanceConfig.getDomainAsMap().get(faultZoneKey));
-        }
-
-        Map<String, Integer> foundInstanceCapacityMap =
-            
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty()
-                ? clusterConfig.getDefaultInstanceCapacityMap()
-                : 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap();
-        Map<String, Integer> instanceCapacityMap = 
instanceConfig.getInstanceCapacityMap().isEmpty()
-            ? clusterConfig.getDefaultInstanceCapacityMap()
-            : instanceConfig.getInstanceCapacityMap();
-        // If the instance does not have the same capacity, we cannot add this 
instance.
-        if (!new EqualsBuilder().append(foundInstanceCapacityMap, 
instanceCapacityMap).isEquals()) {
-          throw new HelixException(
-              "Instance can only be added if the SWAP_OUT instance sharing the 
same logicalId has the same capacity"
-                  + " as this instance. " + "Existing instance: "
-                  + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
-                  + " has capacity: " + foundInstanceCapacityMap + " and this 
instance: " + nodeId
-                  + " has capacity: " + instanceCapacityMap);
-        }
-      } else if 
(foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
-        // No need to check anything on the new node, the old node will be 
evacuated and the new node
-        // will be added.
-      } else {
-        // If the instanceConfig.getInstanceEnabled() is true and the existing 
instance with the same logicalId
-        // does not have InstanceOperation set to one of the above, we cannot 
add this instance.
-        throw new HelixException(
-            "Instance can only be added if the exising instance sharing the 
same logicalId"
-                + " has InstanceOperation set to "
-                + InstanceConstants.InstanceOperation.SWAP_OUT.name()
-                + " and this instance has InstanceOperation set to "
-                + InstanceConstants.InstanceOperation.SWAP_IN.name()
-                + " or the existing instance sharing the same logicalId has 
Instance Operation set to "
-                + InstanceConstants.InstanceOperation.EVACUATE.name()
-                + " and this instance has InstanceOperation unset. Existing 
instance: "
-                + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
-                + " has InstanceOperation: " + 
foundInstanceConfigsWithMatchingLogicalId.get(0)
-                .getInstanceOperation());
-      }
-    } else if (!instanceConfig.getInstanceOperation().isEmpty()) {
-      // If there are no instances with the same logicalId, we can only add 
this instance if InstanceOperation
-      // is unset because it is a new instance.
-      throw new HelixException(
-          "There is no instance with logicalId: " + toAddInstanceLogicalId + " 
in cluster: "
-              + clusterName + "; therefore, " + nodeId
-              + " cannot join cluster with InstanceOperation set to "
-              + instanceConfig.getInstanceOperation() + ".");
+    InstanceConstants.InstanceOperation attemptedInstanceOperation =
+        instanceConfig.getInstanceOperation();
+    try {
+      validateInstanceOperationTransition(instanceConfig,
+          !matchingLogicalIdInstances.isEmpty() ? 
matchingLogicalIdInstances.get(0) : null,
+          InstanceConstants.InstanceOperation.UNKNOWN,
+          attemptedInstanceOperation, clusterName);
+    } catch (HelixException e) {
+      
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
+      logger.error("Failed to add instance " + 
instanceConfig.getInstanceName() + " to cluster "
+          + clusterName + " with instance operation " + 
attemptedInstanceOperation
+          + ". Setting INSTANCE_OPERATION to " + 
instanceConfig.getInstanceOperation()
+          + " instead.", e);

Review Comment:
   Be careful with logging, if pipeline consistent happening, then the log with 
stack trace will blast the server.



##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -146,6 +148,16 @@ private void generateMessage(final Resource resource, 
final BaseControllerDataPr
         }
       }
 
+      // Look through the current state map and add DROPPED message if the 
instance is not in the
+      // resourceStateMap. This instance may not have had been dropped by the 
rebalance strategy.
+      // This check is required to ensure that the instances removed from the 
ideal state stateMap
+      // are properly dropped.
+      for (String instance : currentStateMap.keySet()) {
+        if (!instanceStateMap.containsKey(instance)) {
+          instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());

Review Comment:
   Make sure this is just temp usage. Not persist back to ZK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to