liyuheng55555 commented on code in PR #13559:
URL: https://github.com/apache/iotdb/pull/13559#discussion_r1774808840


##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedureTest.java:
##########
@@ -29,21 +30,28 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-public class RemoveDataNodeProcedureTest {
+public class RemoveDataNodesProcedureTest {
 
   @Test
   public void serDeTest() throws IOException {
-    RemoveDataNodeProcedure procedure0 =
-        new RemoveDataNodeProcedure(
+    List<TDataNodeLocation> removedDataNodes =
+        Collections.singletonList(
             new TDataNodeLocation(
                 10,
                 new TEndPoint("127.0.0.1", 6667),
                 new TEndPoint("127.0.0.1", 6668),
                 new TEndPoint("127.0.0.1", 6669),
                 new TEndPoint("127.0.0.1", 6670),
                 new TEndPoint("127.0.0.1", 6671)));
-
+    Map<Integer, NodeStatus> nodeStatusMap = new HashMap<>();
+    nodeStatusMap.put(10, NodeStatus.Running);
+    RemoveDataNodesProcedure procedure0 =
+        new RemoveDataNodesProcedure(removedDataNodes, nodeStatusMap);

Review Comment:
   Is it necessary to add more test here? For regionMigrationPlans



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  private final ConfigManager configManager;
+
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
dataNodeClientManager;
+
+  public RemoveDataNodeManager(ConfigManager configManager) {
+    this.configManager = configManager;
+    dataNodeClientManager =
+        new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  /**
+   * Check if the data nodes are sufficient after removing.
+   *
+   * @param removedDataNodes List<TDataNodeLocation>
+   * @return true if the number of DataNodes is enough, false otherwise
+   */
+  public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation> 
removedDataNodes) {
+    final int availableDatanodeSize =
+        configManager
+            .getNodeManager()
+            .filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.ReadOnly)
+            .size();
+
+    int dataNodeNumAfterRemoving = availableDatanodeSize;
+    for (TDataNodeLocation removedDatanode : removedDataNodes) {
+      if 
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+          != NodeStatus.Unknown) {
+        dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+      }
+    }

Review Comment:
   Check this check ?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan 
removeAINodePlan) {
     return true;
   }
 
-  // region region migration
+  public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> 
dataNodeLocations) {
+    // 1. Only one RemoveDataNodesProcedure is allowed in the cluster
+    Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RemoveDataNodesProcedure) {
+                    return !procedure.isFinished();
+                  }
+                  return false;
+                })
+            .findAny();
+
+    String failMessage = null;
+    if (anotherRemoveProcedure.isPresent()) {
+      List<TDataNodeLocation> anotherRemoveDataNodes =
+          ((RemoveDataNodesProcedure) 
anotherRemoveProcedure.get()).getRemovedDataNodes();
+      failMessage =
+          String.format(
+              "Submit RemoveDataNodesProcedure failed, "
+                  + "because another RemoveDataNodesProcedure %s is already in 
processing. "
+                  + "IoTDB is able to have at most 1 RemoveDataNodesProcedure 
at the same time. "
+                  + "For further information, please search [pid%d] in log. ",
+              anotherRemoveDataNodes, 
anotherRemoveProcedure.get().getProcId());
+    }
+
+    // 2. Check if the RemoveDataNodesProcedure conflicts with the 
RegionMigrateProcedure
+    RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
+    Set<TConsensusGroupId> removedDataNodesRegionSet =
+        manager.getRemovedDataNodesRegionSet(dataNodeLocations);
+    Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure 
=
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RegionMigrateProcedure) {
+                    RegionMigrateProcedure regionMigrateProcedure =
+                        (RegionMigrateProcedure) procedure;
+                    if (regionMigrateProcedure.isFinished()) {
+                      return false;
+                    }
+                    return removedDataNodesRegionSet.contains(
+                            regionMigrateProcedure.getConsensusGroupId())
+                        || 
dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
+                  }
+                  return false;
+                })
+            .findAny();
+    if (conflictRegionMigrateProcedure.isPresent()) {
+      failMessage =
+          String.format(
+              "Submit RemoveDataNodesProcedure failed, "
+                  + "because another RegionMigrateProcedure %s is already in 
processing which conflicts with this RemoveDataNodesProcedure. "
+                  + "The RegionMigrateProcedure is migrating the region %s to 
the DataNode %s. "
+                  + "For further information, please search [pid%d] in log. ",
+              conflictRegionMigrateProcedure.get().getProcId(),
+              ((RegionMigrateProcedure) 
conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
+              ((RegionMigrateProcedure) 
conflictRegionMigrateProcedure.get()).getDestDataNode(),
+              conflictRegionMigrateProcedure.get().getProcId());
+    }

Review Comment:
   "Check existed RegionMigrateProcedure when submit RemoveDataNodesProcedure"
   "Check existed RemoveDataNodesProcedure when submit RegionMigrateProcedure"
   Their logic should be similar, so maybe extract a function?
   



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  private final ConfigManager configManager;
+
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
dataNodeClientManager;
+
+  public RemoveDataNodeManager(ConfigManager configManager) {
+    this.configManager = configManager;
+    dataNodeClientManager =
+        new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  /**
+   * Check if the data nodes are sufficient after removing.
+   *
+   * @param removedDataNodes List<TDataNodeLocation>
+   * @return true if the number of DataNodes is enough, false otherwise
+   */
+  public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation> 
removedDataNodes) {
+    final int availableDatanodeSize =
+        configManager
+            .getNodeManager()
+            .filterDataNodeThroughStatus(NodeStatus.Running, 
NodeStatus.ReadOnly)
+            .size();
+
+    int dataNodeNumAfterRemoving = availableDatanodeSize;
+    for (TDataNodeLocation removedDatanode : removedDataNodes) {
+      if 
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+          != NodeStatus.Unknown) {
+        dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+      }
+    }
+
+    return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode();
+  }
+
+  /**
+   * Changes the status of the specified DataNode to the given status. This is 
done to prevent the
+   * DataNode from receiving read or write requests when it is being removed 
or is in a restricted
+   * state.
+   *
+   * @param dataNodeLocation the location of the DataNode whose status needs 
to be changed
+   * @param status the new status to assign to the DataNode (e.g., Removing, 
Running, etc.)
+   */
+  public void changeDataNodeStatus(TDataNodeLocation dataNodeLocation, 
NodeStatus status) {
+    // Send request to update NodeStatus on the DataNode to be removed
+    if 
(configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
+        == NodeStatus.Unknown) {
+      SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithGivenRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              status.getStatus(),
+              CnToDnRequestType.SET_SYSTEM_STATUS,
+              1);
+    } else {
+      SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              status.getStatus(),
+              CnToDnRequestType.SET_SYSTEM_STATUS);
+    }
+
+    long currentTime = System.nanoTime();
+    // Force updating NodeStatus to NodeStatus.Removing
+    configManager
+        .getLoadManager()
+        .forceUpdateNodeCache(
+            NodeType.DataNode,
+            dataNodeLocation.getDataNodeId(),
+            new NodeHeartbeatSample(currentTime, status));
+    Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>> 
removingHeartbeatSampleMap =
+        new TreeMap<>();
+    // Force update RegionStatus to NodeStatus.Removing
+    configManager
+        .getPartitionManager()
+        .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+        .forEach(
+            replicaSet ->
+                removingHeartbeatSampleMap.put(
+                    replicaSet.getRegionId(),
+                    Collections.singletonMap(
+                        dataNodeLocation.getDataNodeId(),
+                        new RegionHeartbeatSample(currentTime, 
RegionStatus.Removing))));
+    
configManager.getLoadManager().forceUpdateRegionGroupCache(removingHeartbeatSampleMap);
+  }
+
+  /**
+   * Retrieves all region migration plans for the specified removed DataNodes.
+   *
+   * @param removedDataNodes the list of DataNodes from which to obtain 
migration plans
+   * @return a list of region migration plans associated with the removed 
DataNodes
+   */
+  public List<RegionMigrationPlan> getRegionMigrationPlans(
+      List<TDataNodeLocation> removedDataNodes) {
+    List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+    for (TDataNodeLocation removedDataNode : removedDataNodes) {
+      List<TConsensusGroupId> migratedDataNodeRegions = 
getMigratedDataNodeRegions(removedDataNode);
+      regionMigrationPlans.addAll(
+          migratedDataNodeRegions.stream()
+              .map(regionId -> RegionMigrationPlan.create(regionId, 
removedDataNode))
+              .collect(Collectors.toList()));
+    }
+    return regionMigrationPlans;
+  }
+
+  /**
+   * Broadcasts DataNodes' status change, preventing disabled DataNodes from 
accepting read or write
+   * requests.
+   *
+   * @param dataNodes the list of DataNodes that require broadcast status 
changes
+   */
+  public void broadcastDataNodeStatusChange(List<TDataNodeLocation> dataNodes) 
{
+    String dataNodesString =
+        dataNodes.stream()
+            .map(RegionMaintainHandler::getIdWithRpcEndpoint)
+            .collect(Collectors.joining(", "));
+    LOGGER.info(
+        "{}, BroadcastDataNodeStatusChange start, dataNode: {}",
+        REMOVE_DATANODE_PROCESS,
+        dataNodesString);
+
+    List<TDataNodeConfiguration> otherOnlineDataNodes =
+        
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+            .filter(node -> !dataNodes.contains(node.getLocation()))
+            .collect(Collectors.toList());
+
+    for (TDataNodeConfiguration node : otherOnlineDataNodes) {
+      TCleanDataNodeCacheReq disableReq = new 
TCleanDataNodeCacheReq(dataNodes);
+      TSStatus status =
+          (TSStatus)
+              SyncDataNodeClientPool.getInstance()
+                  .sendSyncRequestToDataNodeWithRetry(
+                      node.getLocation().getInternalEndPoint(),
+                      disableReq,
+                      CnToDnRequestType.CLEAN_DATA_NODE_CACHE);
+      if (!isSucceed(status)) {
+        LOGGER.error(
+            "{}, BroadcastDataNodeStatusChange meets error, dataNode: {}, 
error: {}",
+            REMOVE_DATANODE_PROCESS,
+            dataNodesString,
+            status);
+        return;
+      }
+    }

Review Comment:
   Use AsyncDataNodeClientPool here will make it quicker



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {

Review Comment:
   Would you mind to rename this class as RemoveDataNodeHandler :) ? Then it'll 
be consistent with the RegionMaintainHandler, and differentiate from many other 
Managers in the ConfigNode.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan 
removeAINodePlan) {
     return true;
   }
 
-  // region region migration
+  public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> 
dataNodeLocations) {
+    // 1. Only one RemoveDataNodesProcedure is allowed in the cluster
+    Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RemoveDataNodesProcedure) {
+                    return !procedure.isFinished();
+                  }
+                  return false;
+                })
+            .findAny();
+
+    String failMessage = null;
+    if (anotherRemoveProcedure.isPresent()) {
+      List<TDataNodeLocation> anotherRemoveDataNodes =
+          ((RemoveDataNodesProcedure) 
anotherRemoveProcedure.get()).getRemovedDataNodes();
+      failMessage =
+          String.format(
+              "Submit RemoveDataNodesProcedure failed, "
+                  + "because another RemoveDataNodesProcedure %s is already in 
processing. "
+                  + "IoTDB is able to have at most 1 RemoveDataNodesProcedure 
at the same time. "
+                  + "For further information, please search [pid%d] in log. ",
+              anotherRemoveDataNodes, 
anotherRemoveProcedure.get().getProcId());
+    }
+
+    // 2. Check if the RemoveDataNodesProcedure conflicts with the 
RegionMigrateProcedure
+    RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
+    Set<TConsensusGroupId> removedDataNodesRegionSet =
+        manager.getRemovedDataNodesRegionSet(dataNodeLocations);
+    Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure 
=
+        this.executor.getProcedures().values().stream()
+            .filter(
+                procedure -> {
+                  if (procedure instanceof RegionMigrateProcedure) {
+                    RegionMigrateProcedure regionMigrateProcedure =
+                        (RegionMigrateProcedure) procedure;
+                    if (regionMigrateProcedure.isFinished()) {
+                      return false;
+                    }
+                    return removedDataNodesRegionSet.contains(
+                            regionMigrateProcedure.getConsensusGroupId())
+                        || 
dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
+                  }
+                  return false;
+                })
+            .findAny();
+    if (conflictRegionMigrateProcedure.isPresent()) {
+      failMessage =
+          String.format(
+              "Submit RemoveDataNodesProcedure failed, "
+                  + "because another RegionMigrateProcedure %s is already in 
processing which conflicts with this RemoveDataNodesProcedure. "
+                  + "The RegionMigrateProcedure is migrating the region %s to 
the DataNode %s. "
+                  + "For further information, please search [pid%d] in log. ",
+              conflictRegionMigrateProcedure.get().getProcId(),
+              ((RegionMigrateProcedure) 
conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
+              ((RegionMigrateProcedure) 
conflictRegionMigrateProcedure.get()).getDestDataNode(),
+              conflictRegionMigrateProcedure.get().getProcId());
+    }
+    // 3. Check if the RegionMigrateProcedure generated by 
RemoveDataNodesProcedure conflicts with
+    // each other

Review Comment:
   This is the problem when trying to remove multiple datanodes at the same 
time. 
   
   When the number of regions is relatively large and their distribution is 
sufficiently even, it’s likely that any two datanodes will share the same 
regions, which cause conflict. 
   
   I think a reasonable approach would be to submit the 
RegionMigrationProcedure in several turns, but this doesn’t have to be done in 
v1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to