OneSizeFitsQuorum commented on code in PR #13559:
URL: https://github.com/apache/iotdb/pull/13559#discussion_r1774756266


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan 
removeAINodePlan) {
     return true;
   }
 
-  // region region migration
+  public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> 
dataNodeLocations) {
+    // 1. Only one RemoveDataNodesProcedure is allowed in the cluster
+    Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RemoveDataNodesProcedure) {
+                    return !procedure.isFinished();
+                  }
+                  return false;
+                })
+            .findAny();
+
+    String failMessage = null;
+    if (anotherRemoveProcedure.isPresent()) {
+      List<TDataNodeLocation> anotherRemoveDataNodes =
+          ((RemoveDataNodesProcedure) 
anotherRemoveProcedure.get()).getRemovedDataNodes();
+      failMessage =
+          String.format(
+              "Submit RemoveDataNodesProcedure failed, "
+                  + "because another RemoveDataNodesProcedure %s is already in 
processing. "
+                  + "IoTDB is able to have at most 1 RemoveDataNodesProcedure 
at the same time. "
+                  + "For further information, please search [pid%d] in log. ",
+              anotherRemoveDataNodes, 
anotherRemoveProcedure.get().getProcId());
+    }
+
+    // 2. Check if the RemoveDataNodesProcedure conflicts with the 
RegionMigrateProcedure
+    RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
+    Set<TConsensusGroupId> removedDataNodesRegionSet =
+        manager.getRemovedDataNodesRegionSet(dataNodeLocations);
+    Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure 
=
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RegionMigrateProcedure) {
+                    RegionMigrateProcedure regionMigrateProcedure =
+                        (RegionMigrateProcedure) procedure;
+                    if (regionMigrateProcedure.isFinished()) {
+                      return false;
+                    }
+                    return removedDataNodesRegionSet.contains(
+                            regionMigrateProcedure.getConsensusGroupId())
+                        || 
dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
+                  }
+                  return false;
+                })
+            .findAny();
+    if (conflictRegionMigrateProcedure.isPresent()) {
+      failMessage =
+          String.format(
+              "Submit RemoveDataNodesProcedure failed, "
+                  + "because another RegionMigrateProcedure %s is already in 
processing which conflicts with this RemoveDataNodesProcedure. "
+                  + "The RegionMigrateProcedure is migrating the region %s to 
the DataNode %s. "
+                  + "For further information, please search [pid%d] in log. ",
+              conflictRegionMigrateProcedure.get().getProcId(),
+              ((RegionMigrateProcedure) 
conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
+              ((RegionMigrateProcedure) 
conflictRegionMigrateProcedure.get()).getDestDataNode(),
+              conflictRegionMigrateProcedure.get().getProcId());
+    }
+    // 3. Check if the RegionMigrateProcedure generated by 
RemoveDataNodesProcedure conflicts with
+    // each other
+    List<RegionMigrationPlan> regionMigrationPlans =
+        manager.getRegionMigrationPlans(dataNodeLocations);
+    removedDataNodesRegionSet.clear();
+    for (RegionMigrationPlan regionMigrationPlan : regionMigrationPlans) {
+      if 
(removedDataNodesRegionSet.contains(regionMigrationPlan.getRegionId())) {
+        failMessage =
+            String.format(
+                "Submit RemoveDataNodesProcedure failed, "
+                    + "because the RegionMigrateProcedure generated by this 
RemoveDataNodesProcedure conflicts with each other. "

Review Comment:
   What's the word conflict means? If only one replica of the same consensus 
group is allowed to be deleted, specify this in the error message



##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.sh:
##########
@@ -19,21 +19,35 @@
 #
 
 if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
-    echo "The script will remove a DataNode."
-    echo "Before removing a DataNode, ensure that the cluster has at least the 
number of data/schema replicas DataNodes."
+    echo "The script will remove one or more DataNodes."
+    echo "Before removing DataNodes, ensure that the cluster has at least the 
number of data/schema replicas DataNodes."
     echo "Usage:"
-    echo "Remove the DataNode with datanode_id"
-    echo "./sbin/remove-datanode.sh [datanode_id]"
+    echo "Remove one or more DataNodes with datanode_id"
+    echo "./sbin/remove-datanode.sh [datanode_id ...]"
     exit 0
 fi
 
+# Ensure that at least one DataNode ID is provided
+if [ "$#" -eq 0 ]; then

Review Comment:
   The default if not specified is to delete the local node, it seems that 
there is no need to add this judgment?
   
   For example, we have `remove(null)` in 
`iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/ServerCommandLine.java`?



##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.removedatanode;
+
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDBRemoveDataNodeNormalIT extends 
IoTDBRemoveDataNodeITFramework {
+  @Test
+  public void success1C3DTest() throws Exception {
+    successTest(2, 2, 1, 3, 1, 2);

Review Comment:
   How about using the following configuration to get closer to the actual 
scenario
   - schema replica num : 2
   - data replica num : 2
   - confignode num : 1
   - datanode num : 4
   - removeDataNode num : 1



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -577,17 +580,23 @@ public void removeConfigNode(RemoveConfigNodePlan 
removeConfigNodePlan) {
   }
 
   /**
-   * Generate {@link RemoveDataNodeProcedure}s, and serially execute all the 
{@link
-   * RemoveDataNodeProcedure}s.
+   * Generate {@link RemoveDataNodesProcedure}s, and serially execute all the 
{@link
+   * RemoveDataNodesProcedure}s.
    */
-  public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+  public synchronized boolean removeDataNode(RemoveDataNodePlan 
removeDataNodePlan) {

Review Comment:
   do we need synchronized as we already have 
`getSubmitRegionMigrateLock().lock();`



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -371,51 +370,58 @@ public TDataNodeRestartResp 
updateDataNodeIfNecessary(TDataNodeRestartReq req) {
   }
 
   /**
-   * Remove DataNodes.
+   * Removes the specified DataNodes.
    *
-   * @param removeDataNodePlan removeDataNodePlan
-   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the 
request is accepted,
-   *     DATANODE_NOT_EXIST when some datanode does not exist.
+   * @param removeDataNodePlan the plan detailing which DataNodes to remove
+   * @return DataNodeToStatusResp, where the TSStatus will be SUCCEED_STATUS 
if the request is
+   *     accepted, or DATANODE_NOT_EXIST if any DataNode does not exist.
    */
-  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+  public synchronized DataSet removeDataNode(RemoveDataNodePlan 
removeDataNodePlan) {

Review Comment:
   do we need double locking as we already have 
`getSubmitRegionMigrateLock().lock();`?



##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.bat:
##########
@@ -20,17 +20,40 @@
 @echo off
 
 IF "%~1"=="--help" (
-    echo The script will remove a DataNode.
-    echo Before removing a DataNode, ensure that the cluster has at least the 
number of data/schema replicas DataNodes.
+    echo The script will remove one or more DataNodes.
+    echo Before removing DataNodes, ensure that the cluster has at least the 
number of data/schema replicas DataNodes.
     echo Usage:
-    echo Remove the DataNode with datanode_id
-    echo ./sbin/remove-datanode.bat [datanode_id]
+    echo Remove one or more DataNodes with datanode_id
+    echo ./sbin/remove-datanode.bat [datanode_id ...]
     EXIT /B 0
 )
 
-echo ````````````````````````
-echo Starting to remove a DataNode
-echo ````````````````````````
+REM Ensure that at least one DataNode ID is provided
+IF "%~1"=="" (

Review Comment:
   same as below



##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.sh:
##########
@@ -19,21 +19,35 @@
 #
 
 if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
-    echo "The script will remove a DataNode."
-    echo "Before removing a DataNode, ensure that the cluster has at least the 
number of data/schema replicas DataNodes."
+    echo "The script will remove one or more DataNodes."
+    echo "Before removing DataNodes, ensure that the cluster has at least the 
number of data/schema replicas DataNodes."
     echo "Usage:"
-    echo "Remove the DataNode with datanode_id"
-    echo "./sbin/remove-datanode.sh [datanode_id]"
+    echo "Remove one or more DataNodes with datanode_id"
+    echo "./sbin/remove-datanode.sh [datanode_id ...]"
     exit 0
 fi
 
+# Ensure that at least one DataNode ID is provided
+if [ "$#" -eq 0 ]; then
+    echo "Error: At least one DataNode ID must be provided."
+    exit 1
+fi
+
+# Check for duplicate DataNode IDs
+ids=("$@")
+unique_ids=($(printf "%s\n" "${ids[@]}" | sort -u))
+if [ "${#ids[@]}" -ne "${#unique_ids[@]}" ]; then
+    echo "Error: Duplicate DataNode IDs found."

Review Comment:
   Is it necessary as you have already have a HashSet for nodeids?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -38,82 +40,89 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 
 /** remove data node procedure */
-public class RemoveDataNodeProcedure extends 
AbstractNodeProcedure<RemoveDataNodeState> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+public class RemoveDataNodesProcedure extends 
AbstractNodeProcedure<RemoveDataNodeState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDataNodesProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
-  private TDataNodeLocation removedDataNode;
+  private List<TDataNodeLocation> removedDataNodes;
 
-  private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
+  private List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
 
-  public RemoveDataNodeProcedure() {
+  private Map<Integer, NodeStatus> nodeStatusMap;
+
+  public RemoveDataNodesProcedure() {
     super();
   }
 
-  public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
+  public RemoveDataNodesProcedure(
+      List<TDataNodeLocation> removedDataNodes, Map<Integer, NodeStatus> 
nodeStatusMap) {
     super();
-    this.removedDataNode = removedDataNode;
+    this.removedDataNodes = removedDataNodes;
+    this.nodeStatusMap = nodeStatusMap;
   }
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
RemoveDataNodeState state) {
-    if (removedDataNode == null) {
+    if (removedDataNodes.isEmpty()) {
       return Flow.NO_MORE_STATE;
     }
 
-    RegionMaintainHandler handler = env.getRegionMaintainHandler();
+    RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
     try {
       switch (state) {
         case REGION_REPLICA_CHECK:
-          if (env.checkEnoughDataNodeAfterRemoving(removedDataNode)) {
+          if (manager.checkEnoughDataNodeAfterRemoving(removedDataNodes)) {

Review Comment:
   can we move this judgement before procedure?so that use can see the detailed 
messages for why they can not execute removing datanode? or this is a double 
check?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -695,69 +842,79 @@ private TSStatus checkRegionMigrate(
   }
 
   public synchronized TSStatus migrateRegion(TMigrateRegionReq 
migrateRegionReq) {

Review Comment:
   do we need synchronized?



##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.bat:
##########
@@ -20,17 +20,40 @@
 @echo off
 
 IF "%~1"=="--help" (
-    echo The script will remove a DataNode.
-    echo Before removing a DataNode, ensure that the cluster has at least the 
number of data/schema replicas DataNodes.
+    echo The script will remove one or more DataNodes.
+    echo Before removing DataNodes, ensure that the cluster has at least the 
number of data/schema replicas DataNodes.
     echo Usage:
-    echo Remove the DataNode with datanode_id
-    echo ./sbin/remove-datanode.bat [datanode_id]
+    echo Remove one or more DataNodes with datanode_id
+    echo ./sbin/remove-datanode.bat [datanode_id ...]
     EXIT /B 0
 )
 
-echo ````````````````````````
-echo Starting to remove a DataNode
-echo ````````````````````````
+REM Ensure that at least one DataNode ID is provided
+IF "%~1"=="" (
+    echo Error: At least one DataNode ID must be provided.
+    EXIT /B 1
+)
+
+REM Check for duplicate DataNode IDs
+set "ids=%*"

Review Comment:
   same as below



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/ServerCommandLine.java:
##########
@@ -39,10 +41,10 @@ public abstract class ServerCommandLine {
       Option.builder("r")
           .longOpt("remove")
           .desc(
-              "remove a node (with the given nodeId or the node started on the 
current machine, if omitted)")
-          .hasArg()
+              "remove one or more nodes (with the given nodeIds or the node 
started on the current machine, if omitted)")

Review Comment:
   But we can only remove one node at a time with confignode, right? So maybe 
this base class can't be implemented directly like this? Or we can just write 
in the back that dn allows multiple deletions at once, cn does not



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -38,82 +40,89 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 
 /** remove data node procedure */
-public class RemoveDataNodeProcedure extends 
AbstractNodeProcedure<RemoveDataNodeState> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+public class RemoveDataNodesProcedure extends 
AbstractNodeProcedure<RemoveDataNodeState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDataNodesProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
-  private TDataNodeLocation removedDataNode;
+  private List<TDataNodeLocation> removedDataNodes;
 
-  private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
+  private List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
 
-  public RemoveDataNodeProcedure() {
+  private Map<Integer, NodeStatus> nodeStatusMap;
+
+  public RemoveDataNodesProcedure() {
     super();
   }
 
-  public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
+  public RemoveDataNodesProcedure(
+      List<TDataNodeLocation> removedDataNodes, Map<Integer, NodeStatus> 
nodeStatusMap) {
     super();
-    this.removedDataNode = removedDataNode;
+    this.removedDataNodes = removedDataNodes;
+    this.nodeStatusMap = nodeStatusMap;
   }
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
RemoveDataNodeState state) {
-    if (removedDataNode == null) {
+    if (removedDataNodes.isEmpty()) {
       return Flow.NO_MORE_STATE;
     }
 
-    RegionMaintainHandler handler = env.getRegionMaintainHandler();
+    RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
     try {
       switch (state) {
         case REGION_REPLICA_CHECK:
-          if (env.checkEnoughDataNodeAfterRemoving(removedDataNode)) {
+          if (manager.checkEnoughDataNodeAfterRemoving(removedDataNodes)) {
             setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
           } else {
             LOG.error(
                 "{}, Can not remove DataNode {} "
                     + "because the number of DataNodes is less or equal than 
region replica number",
                 REMOVE_DATANODE_PROCESS,
-                removedDataNode);
+                removedDataNodes);
             return Flow.NO_MORE_STATE;
           }
         case REMOVE_DATA_NODE_PREPARE:
-          // mark the datanode as removing status and broadcast region route 
map
-          env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
-          migratedDataNodeRegions = 
handler.getMigratedDataNodeRegions(removedDataNode);
+          removedDataNodes.parallelStream()

Review Comment:
   parallelStream is not recommended. Can we send rpc asynchronously in 
parallel, wait for the result with a countdownlatch, and then update the memory 
state serially



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  private final ConfigManager configManager;
+
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
dataNodeClientManager;
+
+  public RemoveDataNodeManager(ConfigManager configManager) {
+    this.configManager = configManager;
+    dataNodeClientManager =
+        new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  /**
+   * Check if the data nodes are sufficient after removing.
+   *
+   * @param removedDataNodes List<TDataNodeLocation>
+   * @return true if the number of DataNodes is enough, false otherwise
+   */
+  public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation> 
removedDataNodes) {
+    final int availableDatanodeSize =
+        configManager
+            .getNodeManager()
+            .filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.ReadOnly)
+            .size();
+
+    int dataNodeNumAfterRemoving = availableDatanodeSize;
+    for (TDataNodeLocation removedDatanode : removedDataNodes) {
+      if 
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+          != NodeStatus.Unknown) {
+        dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+      }
+    }
+
+    return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode();
+  }
+
+  /**
+   * Changes the status of the specified DataNode to the given status. This is 
done to prevent the
+   * DataNode from receiving read or write requests when it is being removed 
or is in a restricted
+   * state.
+   *
+   * @param dataNodeLocation the location of the DataNode whose status needs 
to be changed
+   * @param status the new status to assign to the DataNode (e.g., Removing, 
Running, etc.)
+   */
+  public void changeDataNodeStatus(TDataNodeLocation dataNodeLocation, 
NodeStatus status) {
+    // Send request to update NodeStatus on the DataNode to be removed
+    if 
(configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
+        == NodeStatus.Unknown) {
+      SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithGivenRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              status.getStatus(),
+              CnToDnRequestType.SET_SYSTEM_STATUS,
+              1);
+    } else {
+      SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              status.getStatus(),
+              CnToDnRequestType.SET_SYSTEM_STATUS);
+    }
+
+    long currentTime = System.nanoTime();
+    // Force updating NodeStatus to NodeStatus.Removing
+    configManager
+        .getLoadManager()
+        .forceUpdateNodeCache(
+            NodeType.DataNode,
+            dataNodeLocation.getDataNodeId(),
+            new NodeHeartbeatSample(currentTime, status));
+    Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>> 
removingHeartbeatSampleMap =
+        new TreeMap<>();
+    // Force update RegionStatus to NodeStatus.Removing
+    configManager
+        .getPartitionManager()
+        .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+        .forEach(
+            replicaSet ->
+                removingHeartbeatSampleMap.put(
+                    replicaSet.getRegionId(),
+                    Collections.singletonMap(
+                        dataNodeLocation.getDataNodeId(),
+                        new RegionHeartbeatSample(currentTime, 
RegionStatus.Removing))));
+    
configManager.getLoadManager().forceUpdateRegionGroupCache(removingHeartbeatSampleMap);
+  }
+
+  /**
+   * Retrieves all region migration plans for the specified removed DataNodes.
+   *
+   * @param removedDataNodes the list of DataNodes from which to obtain 
migration plans
+   * @return a list of region migration plans associated with the removed 
DataNodes
+   */
+  public List<RegionMigrationPlan> getRegionMigrationPlans(
+      List<TDataNodeLocation> removedDataNodes) {
+    List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+    for (TDataNodeLocation removedDataNode : removedDataNodes) {
+      List<TConsensusGroupId> migratedDataNodeRegions = 
getMigratedDataNodeRegions(removedDataNode);
+      regionMigrationPlans.addAll(
+          migratedDataNodeRegions.stream()
+              .map(regionId -> RegionMigrationPlan.create(regionId, 
removedDataNode))
+              .collect(Collectors.toList()));
+    }
+    return regionMigrationPlans;
+  }
+
+  /**
+   * Broadcasts DataNodes' status change, preventing disabled DataNodes from 
accepting read or write
+   * requests.
+   *
+   * @param dataNodes the list of DataNodes that require broadcast status 
changes
+   */
+  public void broadcastDataNodeStatusChange(List<TDataNodeLocation> dataNodes) 
{
+    String dataNodesString =
+        dataNodes.stream()
+            .map(RegionMaintainHandler::getIdWithRpcEndpoint)
+            .collect(Collectors.joining(", "));
+    LOGGER.info(
+        "{}, BroadcastDataNodeStatusChange start, dataNode: {}",
+        REMOVE_DATANODE_PROCESS,
+        dataNodesString);
+
+    List<TDataNodeConfiguration> otherOnlineDataNodes =
+        
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+            .filter(node -> !dataNodes.contains(node.getLocation()))
+            .collect(Collectors.toList());
+
+    for (TDataNodeConfiguration node : otherOnlineDataNodes) {
+      TCleanDataNodeCacheReq disableReq = new 
TCleanDataNodeCacheReq(dataNodes);
+      TSStatus status =
+          (TSStatus)
+              SyncDataNodeClientPool.getInstance()
+                  .sendSyncRequestToDataNodeWithRetry(
+                      node.getLocation().getInternalEndPoint(),
+                      disableReq,
+                      CnToDnRequestType.CLEAN_DATA_NODE_CACHE);
+      if (!isSucceed(status)) {
+        LOGGER.error(
+            "{}, BroadcastDataNodeStatusChange meets error, dataNode: {}, 
error: {}",
+            REMOVE_DATANODE_PROCESS,
+            dataNodesString,
+            status);
+        return;
+      }
+    }
+
+    LOGGER.info(
+        "{}, BroadcastDataNodeStatusChange finished, dataNode: {}",
+        REMOVE_DATANODE_PROCESS,
+        dataNodesString);
+  }
+
+  /**
+   * Removes a batch of DataNodes from the node information.
+   *
+   * @param removedDataNodes the list of DataNodeLocations to be removed
+   */
+  public void removeDataNodePersistence(List<TDataNodeLocation> 
removedDataNodes) {
+    // Remove consensus record
+    try {
+      configManager.getConsensusManager().write(new 
RemoveDataNodePlan(removedDataNodes));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
+    }
+
+    // Adjust maxRegionGroupNum
+    configManager.getClusterSchemaManager().adjustMaxRegionGroupNum();
+
+    // Remove metrics
+    for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
+      PartitionMetrics.unbindDataNodePartitionMetricsWhenUpdate(
+          MetricService.getInstance(),
+          
NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()));
+    }
+  }
+
+  /**
+   * Stops the specified old DataNodes.
+   *
+   * @param removedDataNodes the list of DataNodeLocations to be stopped
+   */
+  public void stopDataNodes(List<TDataNodeLocation> removedDataNodes) {
+    removedDataNodes.parallelStream().forEach(this::stopDataNode);

Review Comment:
   Please use asynchronous parallel way, do not use parallelStream



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -38,82 +40,89 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 
 /** remove data node procedure */
-public class RemoveDataNodeProcedure extends 
AbstractNodeProcedure<RemoveDataNodeState> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+public class RemoveDataNodesProcedure extends 
AbstractNodeProcedure<RemoveDataNodeState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDataNodesProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
-  private TDataNodeLocation removedDataNode;
+  private List<TDataNodeLocation> removedDataNodes;
 
-  private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
+  private List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
 
-  public RemoveDataNodeProcedure() {
+  private Map<Integer, NodeStatus> nodeStatusMap;
+
+  public RemoveDataNodesProcedure() {
     super();
   }
 
-  public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
+  public RemoveDataNodesProcedure(
+      List<TDataNodeLocation> removedDataNodes, Map<Integer, NodeStatus> 
nodeStatusMap) {
     super();
-    this.removedDataNode = removedDataNode;
+    this.removedDataNodes = removedDataNodes;
+    this.nodeStatusMap = nodeStatusMap;
   }
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
RemoveDataNodeState state) {
-    if (removedDataNode == null) {
+    if (removedDataNodes.isEmpty()) {
       return Flow.NO_MORE_STATE;
     }
 
-    RegionMaintainHandler handler = env.getRegionMaintainHandler();
+    RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
     try {
       switch (state) {
         case REGION_REPLICA_CHECK:
-          if (env.checkEnoughDataNodeAfterRemoving(removedDataNode)) {
+          if (manager.checkEnoughDataNodeAfterRemoving(removedDataNodes)) {
             setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
           } else {
             LOG.error(
                 "{}, Can not remove DataNode {} "
                     + "because the number of DataNodes is less or equal than 
region replica number",
                 REMOVE_DATANODE_PROCESS,
-                removedDataNode);
+                removedDataNodes);
             return Flow.NO_MORE_STATE;
           }
         case REMOVE_DATA_NODE_PREPARE:
-          // mark the datanode as removing status and broadcast region route 
map
-          env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
-          migratedDataNodeRegions = 
handler.getMigratedDataNodeRegions(removedDataNode);
+          removedDataNodes.parallelStream()

Review Comment:
   The previous code sent rpc for only one node, so it could be implemented 
synchronously. Now to send multiple node RPCS in parallel, it is recommended to 
use asynchronous mode



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -371,51 +370,58 @@ public TDataNodeRestartResp 
updateDataNodeIfNecessary(TDataNodeRestartReq req) {
   }
 
   /**
-   * Remove DataNodes.
+   * Removes the specified DataNodes.
    *
-   * @param removeDataNodePlan removeDataNodePlan
-   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the 
request is accepted,
-   *     DATANODE_NOT_EXIST when some datanode does not exist.
+   * @param removeDataNodePlan the plan detailing which DataNodes to remove
+   * @return DataNodeToStatusResp, where the TSStatus will be SUCCEED_STATUS 
if the request is
+   *     accepted, or DATANODE_NOT_EXIST if any DataNode does not exist.
    */
-  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+  public synchronized DataSet removeDataNode(RemoveDataNodePlan 
removeDataNodePlan) {
+    
configManager.getProcedureManager().getEnv().getSubmitRegionMigrateLock().lock();
     LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
+    try {
+      // Checks if the RemoveDataNode request is valid
+      RemoveDataNodeManager manager =
+          
configManager.getProcedureManager().getEnv().getRemoveDataNodeManager();
+      DataNodeToStatusResp preCheckStatus = 
manager.checkRemoveDataNodeRequest(removeDataNodePlan);
+      if (preCheckStatus.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.error(
+            "The remove DataNode request check failed. req: {}, check result: 
{}",
+            removeDataNodePlan,
+            preCheckStatus.getStatus());
+        return preCheckStatus;
+      }
 
-    RegionMaintainHandler handler = new RegionMaintainHandler((ConfigManager) 
configManager);
-    DataNodeToStatusResp preCheckStatus = 
handler.checkRemoveDataNodeRequest(removeDataNodePlan);
-    if (preCheckStatus.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.error(
-          "The remove DataNode request check failed. req: {}, check result: 
{}",
-          removeDataNodePlan,
-          preCheckStatus.getStatus());
-      return preCheckStatus;
-    }
+      // Do transfer of the DataNodes before remove
+      DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
+      if 
(configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
+          != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        dataSet.setStatus(
+            new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
+                .setMessage("Fail to do transfer of the DataNodes"));

Review Comment:
   syntax error.
   
   I feel that the general meaning can be changed to migrate the service on the 
removed node failed ~



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -212,22 +249,54 @@ protected RemoveDataNodeState getInitialState() {
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
     super.serialize(stream);
-    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, 
stream);
-    stream.writeInt(migratedDataNodeRegions.size());
-    migratedDataNodeRegions.forEach(
-        tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, 
stream));
+    stream.writeInt(removedDataNodes.size());
+    removedDataNodes.forEach(
+        dataNode -> 
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNode, stream));
+    stream.writeInt(regionMigrationPlans.size());
+    regionMigrationPlans.forEach(
+        regionMigrationPlan -> {
+          ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+              regionMigrationPlan.getRegionId(), stream);
+          ThriftCommonsSerDeUtils.serializeTDataNodeLocation(
+              regionMigrationPlan.getFromDataNode(), stream);
+          ThriftCommonsSerDeUtils.serializeTDataNodeLocation(
+              regionMigrationPlan.getToDataNode(), stream);
+        });
+    stream.writeInt(nodeStatusMap.size());
+    for (Map.Entry<Integer, NodeStatus> entry : nodeStatusMap.entrySet()) {
+      stream.writeInt(entry.getKey());
+      stream.writeByte(entry.getValue().ordinal());
+    }
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      removedDataNode = 
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
-      int regionSize = byteBuffer.getInt();
-      migratedDataNodeRegions = new ArrayList<>(regionSize);
-      for (int i = 0; i < regionSize; i++) {
-        migratedDataNodeRegions.add(
-            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+      int removedDataNodeSize = byteBuffer.getInt();
+      removedDataNodes = new ArrayList<>(removedDataNodeSize);
+      for (int i = 0; i < removedDataNodeSize; i++) {
+        
removedDataNodes.add(ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer));
+      }
+      int regionMigrationPlanSize = byteBuffer.getInt();
+      regionMigrationPlans = new ArrayList<>(regionMigrationPlanSize);
+      for (int i = 0; i < regionMigrationPlanSize; i++) {
+        TConsensusGroupId regionId =
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+        TDataNodeLocation fromDataNode =
+            ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+        RegionMigrationPlan regionMigrationPlan =
+            RegionMigrationPlan.create(regionId, fromDataNode);
+        regionMigrationPlan.setToDataNode(
+            ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer));

Review Comment:
   same as above



##########
iotdb-core/datanode/src/assembly/resources/sbin/remove-datanode.sh:
##########
@@ -19,21 +19,35 @@
 #
 
 if [ "$#" -eq 1 ] && [ "$1" == "--help" ]; then
-    echo "The script will remove a DataNode."
-    echo "Before removing a DataNode, ensure that the cluster has at least the 
number of data/schema replicas DataNodes."
+    echo "The script will remove one or more DataNodes."
+    echo "Before removing DataNodes, ensure that the cluster has at least the 
number of data/schema replicas DataNodes."
     echo "Usage:"
-    echo "Remove the DataNode with datanode_id"
-    echo "./sbin/remove-datanode.sh [datanode_id]"
+    echo "Remove one or more DataNodes with datanode_id"
+    echo "./sbin/remove-datanode.sh [datanode_id ...]"

Review Comment:
   Please list a example? are different datanode id separated by ','?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -144,19 +145,26 @@ protected void start() throws IoTDBException {
   }
 
   @Override
-  protected void remove(Long nodeId) throws IoTDBException {
+  protected void remove(Set<Integer> nodeIds) throws IoTDBException {
     // If the nodeId was null, this is a shorthand for removing the current 
dataNode.
     // In this case we need to find our nodeId.
-    if (nodeId == null) {
-      nodeId = (long) CONF.getConfigNodeId();
+
+    int removeConfigNodeId = -1;
+    if (nodeIds == null) {
+      removeConfigNodeId = CONF.getConfigNodeId();
+    } else {
+      if (nodeIds.size() != 1) {
+        throw new IoTDBException("Invalid node-id", -1);

Review Comment:
   "Invalid node-id" maybe  confused? Perhaps we should remind that it is not 
allowed to delete multiple Confignodes at the same time?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -212,22 +249,54 @@ protected RemoveDataNodeState getInitialState() {
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
     super.serialize(stream);
-    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, 
stream);
-    stream.writeInt(migratedDataNodeRegions.size());
-    migratedDataNodeRegions.forEach(
-        tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, 
stream));
+    stream.writeInt(removedDataNodes.size());
+    removedDataNodes.forEach(
+        dataNode -> 
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNode, stream));
+    stream.writeInt(regionMigrationPlans.size());
+    regionMigrationPlans.forEach(

Review Comment:
   why not use the ser function for regionMigrationPlan?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  private final ConfigManager configManager;
+
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
dataNodeClientManager;
+
+  public RemoveDataNodeManager(ConfigManager configManager) {
+    this.configManager = configManager;
+    dataNodeClientManager =
+        new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  /**
+   * Check if the data nodes are sufficient after removing.
+   *
+   * @param removedDataNodes List<TDataNodeLocation>
+   * @return true if the number of DataNodes is enough, false otherwise
+   */
+  public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation> 
removedDataNodes) {
+    final int availableDatanodeSize =
+        configManager
+            .getNodeManager()
+            .filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.ReadOnly)
+            .size();
+
+    int dataNodeNumAfterRemoving = availableDatanodeSize;
+    for (TDataNodeLocation removedDatanode : removedDataNodes) {
+      if 
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+          != NodeStatus.Unknown) {
+        dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+      }
+    }
+
+    return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode();
+  }
+
+  /**
+   * Changes the status of the specified DataNode to the given status. This is 
done to prevent the
+   * DataNode from receiving read or write requests when it is being removed 
or is in a restricted
+   * state.
+   *
+   * @param dataNodeLocation the location of the DataNode whose status needs 
to be changed
+   * @param status the new status to assign to the DataNode (e.g., Removing, 
Running, etc.)
+   */
+  public void changeDataNodeStatus(TDataNodeLocation dataNodeLocation, 
NodeStatus status) {
+    // Send request to update NodeStatus on the DataNode to be removed
+    if 
(configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
+        == NodeStatus.Unknown) {
+      SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithGivenRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              status.getStatus(),
+              CnToDnRequestType.SET_SYSTEM_STATUS,
+              1);
+    } else {
+      SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              status.getStatus(),
+              CnToDnRequestType.SET_SYSTEM_STATUS);
+    }
+
+    long currentTime = System.nanoTime();
+    // Force updating NodeStatus to NodeStatus.Removing
+    configManager
+        .getLoadManager()
+        .forceUpdateNodeCache(
+            NodeType.DataNode,
+            dataNodeLocation.getDataNodeId(),
+            new NodeHeartbeatSample(currentTime, status));
+    Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>> 
removingHeartbeatSampleMap =
+        new TreeMap<>();
+    // Force update RegionStatus to NodeStatus.Removing
+    configManager
+        .getPartitionManager()
+        .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+        .forEach(
+            replicaSet ->
+                removingHeartbeatSampleMap.put(
+                    replicaSet.getRegionId(),
+                    Collections.singletonMap(
+                        dataNodeLocation.getDataNodeId(),
+                        new RegionHeartbeatSample(currentTime, 
RegionStatus.Removing))));
+    
configManager.getLoadManager().forceUpdateRegionGroupCache(removingHeartbeatSampleMap);
+  }
+
+  /**
+   * Retrieves all region migration plans for the specified removed DataNodes.
+   *
+   * @param removedDataNodes the list of DataNodes from which to obtain 
migration plans
+   * @return a list of region migration plans associated with the removed 
DataNodes
+   */
+  public List<RegionMigrationPlan> getRegionMigrationPlans(
+      List<TDataNodeLocation> removedDataNodes) {
+    List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+    for (TDataNodeLocation removedDataNode : removedDataNodes) {
+      List<TConsensusGroupId> migratedDataNodeRegions = 
getMigratedDataNodeRegions(removedDataNode);
+      regionMigrationPlans.addAll(
+          migratedDataNodeRegions.stream()
+              .map(regionId -> RegionMigrationPlan.create(regionId, 
removedDataNode))
+              .collect(Collectors.toList()));
+    }
+    return regionMigrationPlans;
+  }
+
+  /**
+   * Broadcasts DataNodes' status change, preventing disabled DataNodes from 
accepting read or write
+   * requests.
+   *
+   * @param dataNodes the list of DataNodes that require broadcast status 
changes
+   */
+  public void broadcastDataNodeStatusChange(List<TDataNodeLocation> dataNodes) 
{
+    String dataNodesString =
+        dataNodes.stream()
+            .map(RegionMaintainHandler::getIdWithRpcEndpoint)
+            .collect(Collectors.joining(", "));
+    LOGGER.info(
+        "{}, BroadcastDataNodeStatusChange start, dataNode: {}",
+        REMOVE_DATANODE_PROCESS,
+        dataNodesString);
+
+    List<TDataNodeConfiguration> otherOnlineDataNodes =
+        
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+            .filter(node -> !dataNodes.contains(node.getLocation()))
+            .collect(Collectors.toList());
+
+    for (TDataNodeConfiguration node : otherOnlineDataNodes) {
+      TCleanDataNodeCacheReq disableReq = new 
TCleanDataNodeCacheReq(dataNodes);
+      TSStatus status =
+          (TSStatus)
+              SyncDataNodeClientPool.getInstance()

Review Comment:
   Please use asynchronous parallel way



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java:
##########
@@ -123,8 +132,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv 
env, RemoveDataNodeState
   }
 
   private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
-    migratedDataNodeRegions.forEach(
-        regionId -> {
+    regionMigrationPlans.forEach(
+        regionMigrationPlan -> {
+          TConsensusGroupId regionId = regionMigrationPlan.getRegionId();
+          TDataNodeLocation removedDataNode = 
regionMigrationPlan.getFromDataNode();
           TDataNodeLocation destDataNode =

Review Comment:
   Can we abstract this transformation from removeDataNodes to 
RegionMigrationPlan into interface to do so that it can be connected to future 
optimizations? 
   
   In the PR could write a for loop in the method to perform a ` env. 
GetRegionMaintainHandler ()
   .filterDataNodeWithOtherRegionReplica(regionId, destDataNode)
   .orElse(removedDataNode); `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to