OneSizeFitsQuorum commented on code in PR #13559:
URL: https://github.com/apache/iotdb/pull/13559#discussion_r1774756266
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan
removeAINodePlan) {
return true;
}
- // region region migration
+ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation>
dataNodeLocations) {
+ // 1. Only one RemoveDataNodesProcedure is allowed in the cluster
+ Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RemoveDataNodesProcedure) {
+ return !procedure.isFinished();
+ }
+ return false;
+ })
+ .findAny();
+
+ String failMessage = null;
+ if (anotherRemoveProcedure.isPresent()) {
+ List<TDataNodeLocation> anotherRemoveDataNodes =
+ ((RemoveDataNodesProcedure)
anotherRemoveProcedure.get()).getRemovedDataNodes();
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because another RemoveDataNodesProcedure %s is already in
processing. "
+ + "IoTDB is able to have at most 1 RemoveDataNodesProcedure
at the same time. "
+ + "For further information, please search [pid%d] in log. ",
+ anotherRemoveDataNodes,
anotherRemoveProcedure.get().getProcId());
+ }
+
+ // 2. Check if the RemoveDataNodesProcedure conflicts with the
RegionMigrateProcedure
+ RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
+ Set<TConsensusGroupId> removedDataNodesRegionSet =
+ manager.getRemovedDataNodesRegionSet(dataNodeLocations);
+ Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure
=
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RegionMigrateProcedure) {
+ RegionMigrateProcedure regionMigrateProcedure =
+ (RegionMigrateProcedure) procedure;
+ if (regionMigrateProcedure.isFinished()) {
+ return false;
+ }
+ return removedDataNodesRegionSet.contains(
+ regionMigrateProcedure.getConsensusGroupId())
+ ||
dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
+ }
+ return false;
+ })
+ .findAny();
+ if (conflictRegionMigrateProcedure.isPresent()) {
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because another RegionMigrateProcedure %s is already in
processing which conflicts with this RemoveDataNodesProcedure. "
+ + "The RegionMigrateProcedure is migrating the region %s to
the DataNode %s. "
+ + "For further information, please search [pid%d] in log. ",
+ conflictRegionMigrateProcedure.get().getProcId(),
+ ((RegionMigrateProcedure)
conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
+ ((RegionMigrateProcedure)
conflictRegionMigrateProcedure.get()).getDestDataNode(),
+ conflictRegionMigrateProcedure.get().getProcId());
+ }
+ // 3. Check if the RegionMigrateProcedure generated by
RemoveDataNodesProcedure conflicts with
+ // each other
+ List<RegionMigrationPlan> regionMigrationPlans =
+ manager.getRegionMigrationPlans(dataNodeLocations);
+ removedDataNodesRegionSet.clear();
+ for (RegionMigrationPlan regionMigrationPlan : regionMigrationPlans) {
+ if
(removedDataNodesRegionSet.contains(regionMigrationPlan.getRegionId())) {
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because the RegionMigrateProcedure generated by this
RemoveDataNodesProcedure conflicts with each other. "
Review Comment:
What's the word conflict means? If only one replica of the same consensus
group is allowed to be deleted, specify this in the error message
##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.sh:
##########
@@ -19,21 +19,35 @@
#
if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
- echo "The script will remove a DataNode."
- echo "Before removing a DataNode, ensure that the cluster has at least the
number of data/schema replicas DataNodes."
+ echo "The script will remove one or more DataNodes."
+ echo "Before removing DataNodes, ensure that the cluster has at least the
number of data/schema replicas DataNodes."
echo "Usage:"
- echo "Remove the DataNode with datanode_id"
- echo "./sbin/remove-datanode.sh [datanode_id]"
+ echo "Remove one or more DataNodes with datanode_id"
+ echo "./sbin/remove-datanode.sh [datanode_id ...]"
exit 0
fi
+# Ensure that at least one DataNode ID is provided
+if [ "$#" -eq 0 ]; then
Review Comment:
The default if not specified is to delete the local node, it seems that
there is no need to add this judgment?
For example, we have `remove(null)` in
`iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/ServerCommandLine.java`?
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.removedatanode;
+
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDBRemoveDataNodeNormalIT extends
IoTDBRemoveDataNodeITFramework {
+ @Test
+ public void success1C3DTest() throws Exception {
+ successTest(2, 2, 1, 3, 1, 2);
Review Comment:
How about using the following configuration to get closer to the actual
scenario
- schema replica num : 2
- data replica num : 2
- confignode num : 1
- datanode num : 4
- removeDataNode num : 1
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -577,17 +580,23 @@ public void removeConfigNode(RemoveConfigNodePlan
removeConfigNodePlan) {
}
/**
- * Generate {@link RemoveDataNodeProcedure}s, and serially execute all the
{@link
- * RemoveDataNodeProcedure}s.
+ * Generate {@link RemoveDataNodesProcedure}s, and serially execute all the
{@link
+ * RemoveDataNodesProcedure}s.
*/
- public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+ public synchronized boolean removeDataNode(RemoveDataNodePlan
removeDataNodePlan) {
Review Comment:
do we need synchronized as we already have
`getSubmitRegionMigrateLock().lock();`
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -371,51 +370,58 @@ public TDataNodeRestartResp
updateDataNodeIfNecessary(TDataNodeRestartReq req) {
}
/**
- * Remove DataNodes.
+ * Removes the specified DataNodes.
*
- * @param removeDataNodePlan removeDataNodePlan
- * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the
request is accepted,
- * DATANODE_NOT_EXIST when some datanode does not exist.
+ * @param removeDataNodePlan the plan detailing which DataNodes to remove
+ * @return DataNodeToStatusResp, where the TSStatus will be SUCCEED_STATUS
if the request is
+ * accepted, or DATANODE_NOT_EXIST if any DataNode does not exist.
*/
- public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+ public synchronized DataSet removeDataNode(RemoveDataNodePlan
removeDataNodePlan) {
Review Comment:
do we need double locking as we already have
`getSubmitRegionMigrateLock().lock();`?
##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.bat:
##########
@@ -20,17 +20,40 @@
@echo off
IF "%~1"=="--help" (
- echo The script will remove a DataNode.
- echo Before removing a DataNode, ensure that the cluster has at least the
number of data/schema replicas DataNodes.
+ echo The script will remove one or more DataNodes.
+ echo Before removing DataNodes, ensure that the cluster has at least the
number of data/schema replicas DataNodes.
echo Usage:
- echo Remove the DataNode with datanode_id
- echo ./sbin/remove-datanode.bat [datanode_id]
+ echo Remove one or more DataNodes with datanode_id
+ echo ./sbin/remove-datanode.bat [datanode_id ...]
EXIT /B 0
)
-echo ````````````````````````
-echo Starting to remove a DataNode
-echo ````````````````````````
+REM Ensure that at least one DataNode ID is provided
+IF "%~1"=="" (
Review Comment:
same as below
##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.sh:
##########
@@ -19,21 +19,35 @@
#
if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
- echo "The script will remove a DataNode."
- echo "Before removing a DataNode, ensure that the cluster has at least the
number of data/schema replicas DataNodes."
+ echo "The script will remove one or more DataNodes."
+ echo "Before removing DataNodes, ensure that the cluster has at least the
number of data/schema replicas DataNodes."
echo "Usage:"
- echo "Remove the DataNode with datanode_id"
- echo "./sbin/remove-datanode.sh [datanode_id]"
+ echo "Remove one or more DataNodes with datanode_id"
+ echo "./sbin/remove-datanode.sh [datanode_id ...]"
exit 0
fi
+# Ensure that at least one DataNode ID is provided
+if [ "$#" -eq 0 ]; then
+ echo "Error: At least one DataNode ID must be provided."
+ exit 1
+fi
+
+# Check for duplicate DataNode IDs
+ids=("$@")
+unique_ids=($(printf "%s\n" "${ids[@]}" | sort -u))
+if [ "${#ids[@]}" -ne "${#unique_ids[@]}" ]; then
+ echo "Error: Duplicate DataNode IDs found."
Review Comment:
Is it necessary as you have already have a HashSet for nodeids?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -38,82 +40,89 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
/** remove data node procedure */
-public class RemoveDataNodeProcedure extends
AbstractNodeProcedure<RemoveDataNodeState> {
- private static final Logger LOG =
LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNodeState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoveDataNodesProcedure.class);
private static final int RETRY_THRESHOLD = 5;
- private TDataNodeLocation removedDataNode;
+ private List<TDataNodeLocation> removedDataNodes;
- private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
+ private List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
- public RemoveDataNodeProcedure() {
+ private Map<Integer, NodeStatus> nodeStatusMap;
+
+ public RemoveDataNodesProcedure() {
super();
}
- public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
+ public RemoveDataNodesProcedure(
+ List<TDataNodeLocation> removedDataNodes, Map<Integer, NodeStatus>
nodeStatusMap) {
super();
- this.removedDataNode = removedDataNode;
+ this.removedDataNodes = removedDataNodes;
+ this.nodeStatusMap = nodeStatusMap;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env,
RemoveDataNodeState state) {
- if (removedDataNode == null) {
+ if (removedDataNodes.isEmpty()) {
return Flow.NO_MORE_STATE;
}
- RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
try {
switch (state) {
case REGION_REPLICA_CHECK:
- if (env.checkEnoughDataNodeAfterRemoving(removedDataNode)) {
+ if (manager.checkEnoughDataNodeAfterRemoving(removedDataNodes)) {
Review Comment:
can we move this judgement before procedure?so that use can see the detailed
messages for why they can not execute removing datanode? or this is a double
check?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -695,69 +842,79 @@ private TSStatus checkRegionMigrate(
}
public synchronized TSStatus migrateRegion(TMigrateRegionReq
migrateRegionReq) {
Review Comment:
do we need synchronized?
##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.bat:
##########
@@ -20,17 +20,40 @@
@echo off
IF "%~1"=="--help" (
- echo The script will remove a DataNode.
- echo Before removing a DataNode, ensure that the cluster has at least the
number of data/schema replicas DataNodes.
+ echo The script will remove one or more DataNodes.
+ echo Before removing DataNodes, ensure that the cluster has at least the
number of data/schema replicas DataNodes.
echo Usage:
- echo Remove the DataNode with datanode_id
- echo ./sbin/remove-datanode.bat [datanode_id]
+ echo Remove one or more DataNodes with datanode_id
+ echo ./sbin/remove-datanode.bat [datanode_id ...]
EXIT /B 0
)
-echo ````````````````````````
-echo Starting to remove a DataNode
-echo ````````````````````````
+REM Ensure that at least one DataNode ID is provided
+IF "%~1"=="" (
+ echo Error: At least one DataNode ID must be provided.
+ EXIT /B 1
+)
+
+REM Check for duplicate DataNode IDs
+set "ids=%*"
Review Comment:
same as below
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/ServerCommandLine.java:
##########
@@ -39,10 +41,10 @@ public abstract class ServerCommandLine {
Option.builder("r")
.longOpt("remove")
.desc(
- "remove a node (with the given nodeId or the node started on the
current machine, if omitted)")
- .hasArg()
+ "remove one or more nodes (with the given nodeIds or the node
started on the current machine, if omitted)")
Review Comment:
But we can only remove one node at a time with confignode, right? So maybe
this base class can't be implemented directly like this? Or we can just write
in the back that dn allows multiple deletions at once, cn does not
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -38,82 +40,89 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
/** remove data node procedure */
-public class RemoveDataNodeProcedure extends
AbstractNodeProcedure<RemoveDataNodeState> {
- private static final Logger LOG =
LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNodeState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoveDataNodesProcedure.class);
private static final int RETRY_THRESHOLD = 5;
- private TDataNodeLocation removedDataNode;
+ private List<TDataNodeLocation> removedDataNodes;
- private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
+ private List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
- public RemoveDataNodeProcedure() {
+ private Map<Integer, NodeStatus> nodeStatusMap;
+
+ public RemoveDataNodesProcedure() {
super();
}
- public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
+ public RemoveDataNodesProcedure(
+ List<TDataNodeLocation> removedDataNodes, Map<Integer, NodeStatus>
nodeStatusMap) {
super();
- this.removedDataNode = removedDataNode;
+ this.removedDataNodes = removedDataNodes;
+ this.nodeStatusMap = nodeStatusMap;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env,
RemoveDataNodeState state) {
- if (removedDataNode == null) {
+ if (removedDataNodes.isEmpty()) {
return Flow.NO_MORE_STATE;
}
- RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
try {
switch (state) {
case REGION_REPLICA_CHECK:
- if (env.checkEnoughDataNodeAfterRemoving(removedDataNode)) {
+ if (manager.checkEnoughDataNodeAfterRemoving(removedDataNodes)) {
setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
} else {
LOG.error(
"{}, Can not remove DataNode {} "
+ "because the number of DataNodes is less or equal than
region replica number",
REMOVE_DATANODE_PROCESS,
- removedDataNode);
+ removedDataNodes);
return Flow.NO_MORE_STATE;
}
case REMOVE_DATA_NODE_PREPARE:
- // mark the datanode as removing status and broadcast region route
map
- env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
- migratedDataNodeRegions =
handler.getMigratedDataNodeRegions(removedDataNode);
+ removedDataNodes.parallelStream()
Review Comment:
parallelStream is not recommended. Can we send rpc asynchronously in
parallel, wait for the result with a countdownlatch, and then update the memory
state serially
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+
+ private final ConfigManager configManager;
+
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
dataNodeClientManager;
+
+ public RemoveDataNodeManager(ConfigManager configManager) {
+ this.configManager = configManager;
+ dataNodeClientManager =
+ new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ /**
+ * Check if the data nodes are sufficient after removing.
+ *
+ * @param removedDataNodes List<TDataNodeLocation>
+ * @return true if the number of DataNodes is enough, false otherwise
+ */
+ public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation>
removedDataNodes) {
+ final int availableDatanodeSize =
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.ReadOnly)
+ .size();
+
+ int dataNodeNumAfterRemoving = availableDatanodeSize;
+ for (TDataNodeLocation removedDatanode : removedDataNodes) {
+ if
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+ != NodeStatus.Unknown) {
+ dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+ }
+ }
+
+ return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode();
+ }
+
+ /**
+ * Changes the status of the specified DataNode to the given status. This is
done to prevent the
+ * DataNode from receiving read or write requests when it is being removed
or is in a restricted
+ * state.
+ *
+ * @param dataNodeLocation the location of the DataNode whose status needs
to be changed
+ * @param status the new status to assign to the DataNode (e.g., Removing,
Running, etc.)
+ */
+ public void changeDataNodeStatus(TDataNodeLocation dataNodeLocation,
NodeStatus status) {
+ // Send request to update NodeStatus on the DataNode to be removed
+ if
(configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
+ == NodeStatus.Unknown) {
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ status.getStatus(),
+ CnToDnRequestType.SET_SYSTEM_STATUS,
+ 1);
+ } else {
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ status.getStatus(),
+ CnToDnRequestType.SET_SYSTEM_STATUS);
+ }
+
+ long currentTime = System.nanoTime();
+ // Force updating NodeStatus to NodeStatus.Removing
+ configManager
+ .getLoadManager()
+ .forceUpdateNodeCache(
+ NodeType.DataNode,
+ dataNodeLocation.getDataNodeId(),
+ new NodeHeartbeatSample(currentTime, status));
+ Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
removingHeartbeatSampleMap =
+ new TreeMap<>();
+ // Force update RegionStatus to NodeStatus.Removing
+ configManager
+ .getPartitionManager()
+ .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+ .forEach(
+ replicaSet ->
+ removingHeartbeatSampleMap.put(
+ replicaSet.getRegionId(),
+ Collections.singletonMap(
+ dataNodeLocation.getDataNodeId(),
+ new RegionHeartbeatSample(currentTime,
RegionStatus.Removing))));
+
configManager.getLoadManager().forceUpdateRegionGroupCache(removingHeartbeatSampleMap);
+ }
+
+ /**
+ * Retrieves all region migration plans for the specified removed DataNodes.
+ *
+ * @param removedDataNodes the list of DataNodes from which to obtain
migration plans
+ * @return a list of region migration plans associated with the removed
DataNodes
+ */
+ public List<RegionMigrationPlan> getRegionMigrationPlans(
+ List<TDataNodeLocation> removedDataNodes) {
+ List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+ for (TDataNodeLocation removedDataNode : removedDataNodes) {
+ List<TConsensusGroupId> migratedDataNodeRegions =
getMigratedDataNodeRegions(removedDataNode);
+ regionMigrationPlans.addAll(
+ migratedDataNodeRegions.stream()
+ .map(regionId -> RegionMigrationPlan.create(regionId,
removedDataNode))
+ .collect(Collectors.toList()));
+ }
+ return regionMigrationPlans;
+ }
+
+ /**
+ * Broadcasts DataNodes' status change, preventing disabled DataNodes from
accepting read or write
+ * requests.
+ *
+ * @param dataNodes the list of DataNodes that require broadcast status
changes
+ */
+ public void broadcastDataNodeStatusChange(List<TDataNodeLocation> dataNodes)
{
+ String dataNodesString =
+ dataNodes.stream()
+ .map(RegionMaintainHandler::getIdWithRpcEndpoint)
+ .collect(Collectors.joining(", "));
+ LOGGER.info(
+ "{}, BroadcastDataNodeStatusChange start, dataNode: {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNodesString);
+
+ List<TDataNodeConfiguration> otherOnlineDataNodes =
+
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+ .filter(node -> !dataNodes.contains(node.getLocation()))
+ .collect(Collectors.toList());
+
+ for (TDataNodeConfiguration node : otherOnlineDataNodes) {
+ TCleanDataNodeCacheReq disableReq = new
TCleanDataNodeCacheReq(dataNodes);
+ TSStatus status =
+ (TSStatus)
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ node.getLocation().getInternalEndPoint(),
+ disableReq,
+ CnToDnRequestType.CLEAN_DATA_NODE_CACHE);
+ if (!isSucceed(status)) {
+ LOGGER.error(
+ "{}, BroadcastDataNodeStatusChange meets error, dataNode: {},
error: {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNodesString,
+ status);
+ return;
+ }
+ }
+
+ LOGGER.info(
+ "{}, BroadcastDataNodeStatusChange finished, dataNode: {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNodesString);
+ }
+
+ /**
+ * Removes a batch of DataNodes from the node information.
+ *
+ * @param removedDataNodes the list of DataNodeLocations to be removed
+ */
+ public void removeDataNodePersistence(List<TDataNodeLocation>
removedDataNodes) {
+ // Remove consensus record
+ try {
+ configManager.getConsensusManager().write(new
RemoveDataNodePlan(removedDataNodes));
+ } catch (ConsensusException e) {
+ LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
+ }
+
+ // Adjust maxRegionGroupNum
+ configManager.getClusterSchemaManager().adjustMaxRegionGroupNum();
+
+ // Remove metrics
+ for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
+ PartitionMetrics.unbindDataNodePartitionMetricsWhenUpdate(
+ MetricService.getInstance(),
+
NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()));
+ }
+ }
+
+ /**
+ * Stops the specified old DataNodes.
+ *
+ * @param removedDataNodes the list of DataNodeLocations to be stopped
+ */
+ public void stopDataNodes(List<TDataNodeLocation> removedDataNodes) {
+ removedDataNodes.parallelStream().forEach(this::stopDataNode);
Review Comment:
Please use asynchronous parallel way, do not use parallelStream
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -38,82 +40,89 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
/** remove data node procedure */
-public class RemoveDataNodeProcedure extends
AbstractNodeProcedure<RemoveDataNodeState> {
- private static final Logger LOG =
LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+public class RemoveDataNodesProcedure extends
AbstractNodeProcedure<RemoveDataNodeState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoveDataNodesProcedure.class);
private static final int RETRY_THRESHOLD = 5;
- private TDataNodeLocation removedDataNode;
+ private List<TDataNodeLocation> removedDataNodes;
- private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
+ private List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
- public RemoveDataNodeProcedure() {
+ private Map<Integer, NodeStatus> nodeStatusMap;
+
+ public RemoveDataNodesProcedure() {
super();
}
- public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
+ public RemoveDataNodesProcedure(
+ List<TDataNodeLocation> removedDataNodes, Map<Integer, NodeStatus>
nodeStatusMap) {
super();
- this.removedDataNode = removedDataNode;
+ this.removedDataNodes = removedDataNodes;
+ this.nodeStatusMap = nodeStatusMap;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env,
RemoveDataNodeState state) {
- if (removedDataNode == null) {
+ if (removedDataNodes.isEmpty()) {
return Flow.NO_MORE_STATE;
}
- RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
try {
switch (state) {
case REGION_REPLICA_CHECK:
- if (env.checkEnoughDataNodeAfterRemoving(removedDataNode)) {
+ if (manager.checkEnoughDataNodeAfterRemoving(removedDataNodes)) {
setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
} else {
LOG.error(
"{}, Can not remove DataNode {} "
+ "because the number of DataNodes is less or equal than
region replica number",
REMOVE_DATANODE_PROCESS,
- removedDataNode);
+ removedDataNodes);
return Flow.NO_MORE_STATE;
}
case REMOVE_DATA_NODE_PREPARE:
- // mark the datanode as removing status and broadcast region route
map
- env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
- migratedDataNodeRegions =
handler.getMigratedDataNodeRegions(removedDataNode);
+ removedDataNodes.parallelStream()
Review Comment:
The previous code sent rpc for only one node, so it could be implemented
synchronously. Now to send multiple node RPCS in parallel, it is recommended to
use asynchronous mode
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -371,51 +370,58 @@ public TDataNodeRestartResp
updateDataNodeIfNecessary(TDataNodeRestartReq req) {
}
/**
- * Remove DataNodes.
+ * Removes the specified DataNodes.
*
- * @param removeDataNodePlan removeDataNodePlan
- * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the
request is accepted,
- * DATANODE_NOT_EXIST when some datanode does not exist.
+ * @param removeDataNodePlan the plan detailing which DataNodes to remove
+ * @return DataNodeToStatusResp, where the TSStatus will be SUCCEED_STATUS
if the request is
+ * accepted, or DATANODE_NOT_EXIST if any DataNode does not exist.
*/
- public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+ public synchronized DataSet removeDataNode(RemoveDataNodePlan
removeDataNodePlan) {
+
configManager.getProcedureManager().getEnv().getSubmitRegionMigrateLock().lock();
LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
+ try {
+ // Checks if the RemoveDataNode request is valid
+ RemoveDataNodeManager manager =
+
configManager.getProcedureManager().getEnv().getRemoveDataNodeManager();
+ DataNodeToStatusResp preCheckStatus =
manager.checkRemoveDataNodeRequest(removeDataNodePlan);
+ if (preCheckStatus.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.error(
+ "The remove DataNode request check failed. req: {}, check result:
{}",
+ removeDataNodePlan,
+ preCheckStatus.getStatus());
+ return preCheckStatus;
+ }
- RegionMaintainHandler handler = new RegionMaintainHandler((ConfigManager)
configManager);
- DataNodeToStatusResp preCheckStatus =
handler.checkRemoveDataNodeRequest(removeDataNodePlan);
- if (preCheckStatus.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(
- "The remove DataNode request check failed. req: {}, check result:
{}",
- removeDataNodePlan,
- preCheckStatus.getStatus());
- return preCheckStatus;
- }
+ // Do transfer of the DataNodes before remove
+ DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
+ if
(configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataSet.setStatus(
+ new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
+ .setMessage("Fail to do transfer of the DataNodes"));
Review Comment:
syntax error.
I feel that the general meaning can be changed to migrate the service on the
removed node failed ~
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -212,22 +249,54 @@ protected RemoveDataNodeState getInitialState() {
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
super.serialize(stream);
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode,
stream);
- stream.writeInt(migratedDataNodeRegions.size());
- migratedDataNodeRegions.forEach(
- tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid,
stream));
+ stream.writeInt(removedDataNodes.size());
+ removedDataNodes.forEach(
+ dataNode ->
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNode, stream));
+ stream.writeInt(regionMigrationPlans.size());
+ regionMigrationPlans.forEach(
+ regionMigrationPlan -> {
+ ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+ regionMigrationPlan.getRegionId(), stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(
+ regionMigrationPlan.getFromDataNode(), stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(
+ regionMigrationPlan.getToDataNode(), stream);
+ });
+ stream.writeInt(nodeStatusMap.size());
+ for (Map.Entry<Integer, NodeStatus> entry : nodeStatusMap.entrySet()) {
+ stream.writeInt(entry.getKey());
+ stream.writeByte(entry.getValue().ordinal());
+ }
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- removedDataNode =
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
- int regionSize = byteBuffer.getInt();
- migratedDataNodeRegions = new ArrayList<>(regionSize);
- for (int i = 0; i < regionSize; i++) {
- migratedDataNodeRegions.add(
- ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+ int removedDataNodeSize = byteBuffer.getInt();
+ removedDataNodes = new ArrayList<>(removedDataNodeSize);
+ for (int i = 0; i < removedDataNodeSize; i++) {
+
removedDataNodes.add(ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer));
+ }
+ int regionMigrationPlanSize = byteBuffer.getInt();
+ regionMigrationPlans = new ArrayList<>(regionMigrationPlanSize);
+ for (int i = 0; i < regionMigrationPlanSize; i++) {
+ TConsensusGroupId regionId =
+ ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+ TDataNodeLocation fromDataNode =
+ ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ RegionMigrationPlan regionMigrationPlan =
+ RegionMigrationPlan.create(regionId, fromDataNode);
+ regionMigrationPlan.setToDataNode(
+ ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer));
Review Comment:
same as above
##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.sh:
##########
@@ -19,21 +19,35 @@
#
if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
- echo "The script will remove a DataNode."
- echo "Before removing a DataNode, ensure that the cluster has at least the
number of data/schema replicas DataNodes."
+ echo "The script will remove one or more DataNodes."
+ echo "Before removing DataNodes, ensure that the cluster has at least the
number of data/schema replicas DataNodes."
echo "Usage:"
- echo "Remove the DataNode with datanode_id"
- echo "./sbin/remove-datanode.sh [datanode_id]"
+ echo "Remove one or more DataNodes with datanode_id"
+ echo "./sbin/remove-datanode.sh [datanode_id ...]"
Review Comment:
Please list a example? are different datanode id separated by ','?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -144,19 +145,26 @@ protected void start() throws IoTDBException {
}
@Override
- protected void remove(Long nodeId) throws IoTDBException {
+ protected void remove(Set<Integer> nodeIds) throws IoTDBException {
// If the nodeId was null, this is a shorthand for removing the current
dataNode.
// In this case we need to find our nodeId.
- if (nodeId == null) {
- nodeId = (long) CONF.getConfigNodeId();
+
+ int removeConfigNodeId = -1;
+ if (nodeIds == null) {
+ removeConfigNodeId = CONF.getConfigNodeId();
+ } else {
+ if (nodeIds.size() != 1) {
+ throw new IoTDBException("Invalid node-id", -1);
Review Comment:
"Invalid node-id" maybe confused? Perhaps we should remind that it is not
allowed to delete multiple Confignodes at the same time?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -212,22 +249,54 @@ protected RemoveDataNodeState getInitialState() {
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
super.serialize(stream);
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode,
stream);
- stream.writeInt(migratedDataNodeRegions.size());
- migratedDataNodeRegions.forEach(
- tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid,
stream));
+ stream.writeInt(removedDataNodes.size());
+ removedDataNodes.forEach(
+ dataNode ->
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNode, stream));
+ stream.writeInt(regionMigrationPlans.size());
+ regionMigrationPlans.forEach(
Review Comment:
why not use the ser function for regionMigrationPlan?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+
+ private final ConfigManager configManager;
+
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
dataNodeClientManager;
+
+ public RemoveDataNodeManager(ConfigManager configManager) {
+ this.configManager = configManager;
+ dataNodeClientManager =
+ new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ /**
+ * Check if the data nodes are sufficient after removing.
+ *
+ * @param removedDataNodes List<TDataNodeLocation>
+ * @return true if the number of DataNodes is enough, false otherwise
+ */
+ public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation>
removedDataNodes) {
+ final int availableDatanodeSize =
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.ReadOnly)
+ .size();
+
+ int dataNodeNumAfterRemoving = availableDatanodeSize;
+ for (TDataNodeLocation removedDatanode : removedDataNodes) {
+ if
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+ != NodeStatus.Unknown) {
+ dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+ }
+ }
+
+ return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode();
+ }
+
+ /**
+ * Changes the status of the specified DataNode to the given status. This is
done to prevent the
+ * DataNode from receiving read or write requests when it is being removed
or is in a restricted
+ * state.
+ *
+ * @param dataNodeLocation the location of the DataNode whose status needs
to be changed
+ * @param status the new status to assign to the DataNode (e.g., Removing,
Running, etc.)
+ */
+ public void changeDataNodeStatus(TDataNodeLocation dataNodeLocation,
NodeStatus status) {
+ // Send request to update NodeStatus on the DataNode to be removed
+ if
(configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
+ == NodeStatus.Unknown) {
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ status.getStatus(),
+ CnToDnRequestType.SET_SYSTEM_STATUS,
+ 1);
+ } else {
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ status.getStatus(),
+ CnToDnRequestType.SET_SYSTEM_STATUS);
+ }
+
+ long currentTime = System.nanoTime();
+ // Force updating NodeStatus to NodeStatus.Removing
+ configManager
+ .getLoadManager()
+ .forceUpdateNodeCache(
+ NodeType.DataNode,
+ dataNodeLocation.getDataNodeId(),
+ new NodeHeartbeatSample(currentTime, status));
+ Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
removingHeartbeatSampleMap =
+ new TreeMap<>();
+ // Force update RegionStatus to NodeStatus.Removing
+ configManager
+ .getPartitionManager()
+ .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+ .forEach(
+ replicaSet ->
+ removingHeartbeatSampleMap.put(
+ replicaSet.getRegionId(),
+ Collections.singletonMap(
+ dataNodeLocation.getDataNodeId(),
+ new RegionHeartbeatSample(currentTime,
RegionStatus.Removing))));
+
configManager.getLoadManager().forceUpdateRegionGroupCache(removingHeartbeatSampleMap);
+ }
+
+ /**
+ * Retrieves all region migration plans for the specified removed DataNodes.
+ *
+ * @param removedDataNodes the list of DataNodes from which to obtain
migration plans
+ * @return a list of region migration plans associated with the removed
DataNodes
+ */
+ public List<RegionMigrationPlan> getRegionMigrationPlans(
+ List<TDataNodeLocation> removedDataNodes) {
+ List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+ for (TDataNodeLocation removedDataNode : removedDataNodes) {
+ List<TConsensusGroupId> migratedDataNodeRegions =
getMigratedDataNodeRegions(removedDataNode);
+ regionMigrationPlans.addAll(
+ migratedDataNodeRegions.stream()
+ .map(regionId -> RegionMigrationPlan.create(regionId,
removedDataNode))
+ .collect(Collectors.toList()));
+ }
+ return regionMigrationPlans;
+ }
+
+ /**
+ * Broadcasts DataNodes' status change, preventing disabled DataNodes from
accepting read or write
+ * requests.
+ *
+ * @param dataNodes the list of DataNodes that require broadcast status
changes
+ */
+ public void broadcastDataNodeStatusChange(List<TDataNodeLocation> dataNodes)
{
+ String dataNodesString =
+ dataNodes.stream()
+ .map(RegionMaintainHandler::getIdWithRpcEndpoint)
+ .collect(Collectors.joining(", "));
+ LOGGER.info(
+ "{}, BroadcastDataNodeStatusChange start, dataNode: {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNodesString);
+
+ List<TDataNodeConfiguration> otherOnlineDataNodes =
+
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+ .filter(node -> !dataNodes.contains(node.getLocation()))
+ .collect(Collectors.toList());
+
+ for (TDataNodeConfiguration node : otherOnlineDataNodes) {
+ TCleanDataNodeCacheReq disableReq = new
TCleanDataNodeCacheReq(dataNodes);
+ TSStatus status =
+ (TSStatus)
+ SyncDataNodeClientPool.getInstance()
Review Comment:
Please use asynchronous parallel way
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -123,8 +132,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv
env, RemoveDataNodeState
}
private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
- migratedDataNodeRegions.forEach(
- regionId -> {
+ regionMigrationPlans.forEach(
+ regionMigrationPlan -> {
+ TConsensusGroupId regionId = regionMigrationPlan.getRegionId();
+ TDataNodeLocation removedDataNode =
regionMigrationPlan.getFromDataNode();
TDataNodeLocation destDataNode =
Review Comment:
Can we abstract this transformation from removeDataNodes to
RegionMigrationPlan into interface to do so that it can be connected to future
optimizations?
In the PR could write a for loop in the method to perform a ` env.
GetRegionMaintainHandler ()
.filterDataNodeWithOtherRegionReplica(regionId, destDataNode)
.orElse(removedDataNode); `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]