junkaixue commented on code in PR #2772:
URL: https://github.com/apache/helix/pull/2772#discussion_r1550872329
##########
helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java:
##########
@@ -321,29 +330,93 @@ public String getInstanceDisabledReason() {
* Default is am empty string.
*/
public String getInstanceDisabledType() {
- if (getInstanceEnabled()) {
+ if
(!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
return InstanceConstants.INSTANCE_NOT_DISABLED;
}
return
_record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
}
/**
- * Get the timestamp (milliseconds from epoch) when this instance was
enabled/disabled last time.
+ * Set the instance operation for this instance.
*
- * @return
+ * @param operation the instance operation
*/
- public long getInstanceEnabledTime() {
- return
_record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
- }
-
public void setInstanceOperation(InstanceConstants.InstanceOperation
operation) {
_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation == null ? "" : operation.name());
+ if (operation == null || operation ==
InstanceConstants.InstanceOperation.ENABLE
+ || operation == InstanceConstants.InstanceOperation.DISABLE) {
+ // We are still setting the HELIX_ENABLED field for backwards
compatibility.
+ // It is possible that users will be using earlier version of HelixAdmin
or helix-rest
+ // is on older version.
+ // TODO: Remove this when we are sure that all users are using the new
field INSTANCE_OPERATION.
+ setInstanceEnabledHelper(!(operation ==
InstanceConstants.InstanceOperation.DISABLE));
+ }
+ }
+
+ private void setInstanceOperationInit(InstanceConstants.InstanceOperation
operation) {
+ if (operation == null) {
+ return;
+ }
+ _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation.name());
+ }
+
+ /**
+ * Get the InstanceOperation of this instance, default is ENABLE if nothing
is set. If
+ * HELIX_ENABLED is set to false, then the instance operation is DISABLE for
backwards
+ * compatibility.
+ *
+ * @return the instance operation
+ */
+ public InstanceConstants.InstanceOperation getInstanceOperation() {
+ String instanceOperationString =
+
_record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name());
+
+ InstanceConstants.InstanceOperation instanceOperation;
+ try {
+ // If INSTANCE_OPERATION is not set, then the instance is enabled.
+ instanceOperation = instanceOperationString == null ||
instanceOperationString.isEmpty()
Review Comment:
Need parenthesis for check null and empty?
##########
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java:
##########
@@ -509,62 +413,107 @@ public void enableInstance(String clusterName,
List<String> instances, boolean e
//enableInstance(clusterName, instances, enabled, null, null);
}
+ private void validateInstanceOperationTransition(InstanceConfig
instanceConfig,
+ InstanceConfig matchingLogicalIdInstance,
+ InstanceConstants.InstanceOperation currentOperation,
+ InstanceConstants.InstanceOperation targetOperation,
+ String clusterName) {
+ boolean targetStateEnableOrDisable =
+ targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE)
+ ||
targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE);
+ switch (currentOperation) {
+ case ENABLE:
+ case DISABLE:
+ // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any
time.
+ if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE,
+ InstanceConstants.InstanceOperation.DISABLE,
+
InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) {
+ return;
+ }
+ case SWAP_IN:
+ // We can only ENABLE or DISABLE a SWAP_IN instance if there is an
instance with matching logicalId
+ // with an InstanceOperation set to UNKNOWN.
+ if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+ || matchingLogicalIdInstance.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN))) ||
targetOperation.equals(
+ InstanceConstants.InstanceOperation.UNKNOWN)) {
+ return;
+ }
+ case EVACUATE:
+ // EVACUATE can only be set to ENABLE or DISABLE when there is no
instance with the same
+ // logicalId in the cluster.
+ if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null)
+ ||
targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
+ return;
+ }
+ case UNKNOWN:
+ // UNKNOWN can be set to ENABLE or DISABLE when there is no instance
with the same logicalId in the cluster
+ // or the instance with the same logicalId in the cluster has
InstanceOperation set to EVACUATE.
+ // UNKNOWN can be set to SWAP_IN when there is an instance with the
same logicalId in the cluster set to ENABLE,
+ // or DISABLE.
+ if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+ || matchingLogicalIdInstance.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.EVACUATE)))) {
+ return;
+ } else if
(targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN)
+ && matchingLogicalIdInstance != null && !ImmutableSet.of(
+ InstanceConstants.InstanceOperation.UNKNOWN,
+ InstanceConstants.InstanceOperation.EVACUATE)
+ .contains(matchingLogicalIdInstance.getInstanceOperation())) {
+ // Get the topology key used to determine the logicalId of a node.
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+ String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
+ // If the existing instance with the same logicalId is not in the
same FAULT_ZONE as this instance, we cannot
+ // add this instance.
+ if
(!matchingLogicalIdInstance.getDomainAsMap().containsKey(faultZoneKey)
Review Comment:
This is too strong right? We should allow them to add instance with same
logic id but different zone. Then the cluster should be able to rebalance
instead of disallowing it.
##########
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java:
##########
@@ -131,82 +135,102 @@ public void process(ClusterEvent event) throws Exception
{
});
}
+ private String selectSwapInState(StateModelDefinition stateModelDef,
Map<String, String> stateMap,
+ String swapOutInstance) {
+ // If the swap-in node is live, select state with the following logic:
+ // 1. If the swap-out instance's replica is in the stateMap:
+ // - if the swap-out instance's replica is a topState, select the swap-in
instance's replica to the topState.
+ // if another is allowed to be added, otherwise select the swap-in
instance's replica to a secondTopState.
+ // - if the swap-out instance's replica is not a topState or ERROR, select
the swap-in instance's replica to the same state.
+ // - if the swap-out instance's replica is ERROR, select the swap-in
instance's replica to the initialState.
+ // 2. If the swap-out instance's replica is not in the stateMap, select
the swap-in instance's replica to the initialState.
+ // This happens when the swap-out node is offline.
+ if (stateMap.containsKey(swapOutInstance)) {
+ if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) ||
stateMap.get(
+ swapOutInstance).equals(HelixDefinedState.ERROR.name())) {
+ // If the swap-out instance's replica is a topState, select the
swap-in instance's replica
+ // to be the topState if the StateModel allows another to be added. If
not, select the swap-in
+ // to be the secondTopState.
+ String topStateCount =
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+ if
(topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+ ||
topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
+ // If the StateModel allows for another replica with the topState to
be added,
+ // select the swap-in instance's replica to the topState.
+ return stateModelDef.getTopState();
+ } else {
+ // If StateModel does not allow another topState replica to be
+ // added, select the swap-in instance's replica to be the
secondTopState.
+ return stateModelDef.getSecondTopStates().iterator().next();
+ }
+ } else {
+ // If the swap-out instance's replica is not a topState or ERROR,
select the swap-in instance's replica
+ // to be the same state
+ return stateMap.get(swapOutInstance);
+ }
+ } else {
+ // If the swap-out instance's replica is not in the stateMap, select the
swap-in instance's replica
+ // to be the initialState. This happens when the swap-out node is
offline.
+ return stateModelDef.getInitialState();
Review Comment:
This is confusing to me. If the instance is not state map, it suppose not be
in preference list. Also, the original swap out one is already dropped. Then we
should not even to put a state to that instance, I think?
##########
helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java:
##########
@@ -131,82 +135,102 @@ public void process(ClusterEvent event) throws Exception
{
});
}
+ private String selectSwapInState(StateModelDefinition stateModelDef,
Map<String, String> stateMap,
+ String swapOutInstance) {
+ // If the swap-in node is live, select state with the following logic:
+ // 1. If the swap-out instance's replica is in the stateMap:
+ // - if the swap-out instance's replica is a topState, select the swap-in
instance's replica to the topState.
+ // if another is allowed to be added, otherwise select the swap-in
instance's replica to a secondTopState.
+ // - if the swap-out instance's replica is not a topState or ERROR, select
the swap-in instance's replica to the same state.
+ // - if the swap-out instance's replica is ERROR, select the swap-in
instance's replica to the initialState.
+ // 2. If the swap-out instance's replica is not in the stateMap, select
the swap-in instance's replica to the initialState.
+ // This happens when the swap-out node is offline.
+ if (stateMap.containsKey(swapOutInstance)) {
+ if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) ||
stateMap.get(
+ swapOutInstance).equals(HelixDefinedState.ERROR.name())) {
+ // If the swap-out instance's replica is a topState, select the
swap-in instance's replica
+ // to be the topState if the StateModel allows another to be added. If
not, select the swap-in
+ // to be the secondTopState.
+ String topStateCount =
stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+ if
(topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+ ||
topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
+ // If the StateModel allows for another replica with the topState to
be added,
+ // select the swap-in instance's replica to the topState.
+ return stateModelDef.getTopState();
+ } else {
+ // If StateModel does not allow another topState replica to be
+ // added, select the swap-in instance's replica to be the
secondTopState.
+ return stateModelDef.getSecondTopStates().iterator().next();
+ }
Review Comment:
Usually, if has a return statement, there is no need to specific the else
clause.
##########
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java:
##########
@@ -206,113 +205,29 @@ public void addInstance(String clusterName,
InstanceConfig instanceConfig) {
throw new HelixException("Node " + nodeId + " already exists in cluster
" + clusterName);
}
- if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains(
- instanceConfig.getInstanceOperation())) {
+ List<InstanceConfig> matchingLogicalIdInstances =
+ findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ if (matchingLogicalIdInstances.size() > 1) {
throw new HelixException(
- "Instance can only be added if InstanceOperation is set to one of" +
"the following: "
- + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This
instance: " + nodeId
- + " has InstanceOperation set to " +
instanceConfig.getInstanceOperation());
+ "There are already more than one instance with the same logicalId in
the cluster: "
+ +
matchingLogicalIdInstances.stream().map(InstanceConfig::getInstanceName)
+ .collect(Collectors.joining(", "))
+ + " Please make sure there is at most 2 instance with the same
logicalId in the cluster.");
}
- // Get the topology key used to determine the logicalId of a node.
- ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
- ClusterTopologyConfig clusterTopologyConfig =
- ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
- String logicalIdKey = clusterTopologyConfig.getEndNodeType();
- String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
- String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey);
-
- HelixConfigScope instanceConfigScope =
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
- clusterName).build();
- List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
- List<InstanceConfig> foundInstanceConfigsWithMatchingLogicalId =
- existingInstanceIds.parallelStream()
- .map(existingInstanceId -> getInstanceConfig(clusterName,
existingInstanceId)).filter(
- existingInstanceConfig ->
existingInstanceConfig.getLogicalId(logicalIdKey)
-
.equals(toAddInstanceLogicalId)).collect(Collectors.toList());
-
- if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) {
- // If the length is 2, we cannot add an instance with the same logicalId
as an existing instance
- // regardless of InstanceOperation.
- throw new HelixException(
- "There can only be 2 instances with the same logicalId in a cluster.
"
- + "Existing instances: " +
foundInstanceConfigsWithMatchingLogicalId.get(0)
- .getInstanceName() + " and " +
foundInstanceConfigsWithMatchingLogicalId.get(1)
- .getInstanceName() + " already have the same logicalId: " +
toAddInstanceLogicalId
- + "; therefore, " + nodeId + " cannot be added to the cluster.");
- } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) {
- // If there is only one instance with the same logicalId,
- // we can infer that the intended behaviour is to SWAP_IN or EVACUATE +
ADD.
- if
(foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
- // If the existing instance with the same logicalId has SWAP_OUT
InstanceOperation
-
- // If the InstanceOperation is unset, we will set it to SWAP_IN.
- if (!instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
-
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
- }
-
- // If the existing instance with the same logicalId is not in the same
FAULT_ZONE as this instance, we cannot
- // add this instance.
- if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap()
- .containsKey(faultZoneKey) ||
!instanceConfig.getDomainAsMap().containsKey(faultZoneKey)
- ||
!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey)
- .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) {
- throw new HelixException(
- "Instance can only be added if the SWAP_OUT instance sharing the
same logicalId is in the same FAULT_ZONE"
- + " as this instance. " + "Existing instance: "
- +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
- + " has FAULT_ZONE_TYPE: " +
foundInstanceConfigsWithMatchingLogicalId.get(0)
- .getDomainAsMap().get(faultZoneKey) + " and this instance: "
+ nodeId
- + " has FAULT_ZONE_TYPE: " +
instanceConfig.getDomainAsMap().get(faultZoneKey));
- }
-
- Map<String, Integer> foundInstanceCapacityMap =
-
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty()
- ? clusterConfig.getDefaultInstanceCapacityMap()
- :
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap();
- Map<String, Integer> instanceCapacityMap =
instanceConfig.getInstanceCapacityMap().isEmpty()
- ? clusterConfig.getDefaultInstanceCapacityMap()
- : instanceConfig.getInstanceCapacityMap();
- // If the instance does not have the same capacity, we cannot add this
instance.
- if (!new EqualsBuilder().append(foundInstanceCapacityMap,
instanceCapacityMap).isEquals()) {
- throw new HelixException(
- "Instance can only be added if the SWAP_OUT instance sharing the
same logicalId has the same capacity"
- + " as this instance. " + "Existing instance: "
- +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
- + " has capacity: " + foundInstanceCapacityMap + " and this
instance: " + nodeId
- + " has capacity: " + instanceCapacityMap);
- }
- } else if
(foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
- // No need to check anything on the new node, the old node will be
evacuated and the new node
- // will be added.
- } else {
- // If the instanceConfig.getInstanceEnabled() is true and the existing
instance with the same logicalId
- // does not have InstanceOperation set to one of the above, we cannot
add this instance.
- throw new HelixException(
- "Instance can only be added if the exising instance sharing the
same logicalId"
- + " has InstanceOperation set to "
- + InstanceConstants.InstanceOperation.SWAP_OUT.name()
- + " and this instance has InstanceOperation set to "
- + InstanceConstants.InstanceOperation.SWAP_IN.name()
- + " or the existing instance sharing the same logicalId has
Instance Operation set to "
- + InstanceConstants.InstanceOperation.EVACUATE.name()
- + " and this instance has InstanceOperation unset. Existing
instance: "
- +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
- + " has InstanceOperation: " +
foundInstanceConfigsWithMatchingLogicalId.get(0)
- .getInstanceOperation());
- }
- } else if (!instanceConfig.getInstanceOperation().isEmpty()) {
- // If there are no instances with the same logicalId, we can only add
this instance if InstanceOperation
- // is unset because it is a new instance.
- throw new HelixException(
- "There is no instance with logicalId: " + toAddInstanceLogicalId + "
in cluster: "
- + clusterName + "; therefore, " + nodeId
- + " cannot join cluster with InstanceOperation set to "
- + instanceConfig.getInstanceOperation() + ".");
+ InstanceConstants.InstanceOperation attemptedInstanceOperation =
+ instanceConfig.getInstanceOperation();
+ try {
+ validateInstanceOperationTransition(instanceConfig,
+ !matchingLogicalIdInstances.isEmpty() ?
matchingLogicalIdInstances.get(0) : null,
+ InstanceConstants.InstanceOperation.UNKNOWN,
+ attemptedInstanceOperation, clusterName);
+ } catch (HelixException e) {
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
+ logger.error("Failed to add instance " +
instanceConfig.getInstanceName() + " to cluster "
+ + clusterName + " with instance operation " +
attemptedInstanceOperation
+ + ". Setting INSTANCE_OPERATION to " +
instanceConfig.getInstanceOperation()
+ + " instead.", e);
Review Comment:
Be careful with logging, if pipeline consistent happening, then the log with
stack trace will blast the server.
##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -146,6 +148,16 @@ private void generateMessage(final Resource resource,
final BaseControllerDataPr
}
}
+ // Look through the current state map and add DROPPED message if the
instance is not in the
+ // resourceStateMap. This instance may not have had been dropped by the
rebalance strategy.
+ // This check is required to ensure that the instances removed from the
ideal state stateMap
+ // are properly dropped.
+ for (String instance : currentStateMap.keySet()) {
+ if (!instanceStateMap.containsKey(instance)) {
+ instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
Review Comment:
Make sure this is just temp usage. Not persist back to ZK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]