liyuheng55555 commented on code in PR #13559:
URL: https://github.com/apache/iotdb/pull/13559#discussion_r1774808840
##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedureTest.java:
##########
@@ -29,21 +30,28 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-public class RemoveDataNodeProcedureTest {
+public class RemoveDataNodesProcedureTest {
@Test
public void serDeTest() throws IOException {
- RemoveDataNodeProcedure procedure0 =
- new RemoveDataNodeProcedure(
+ List<TDataNodeLocation> removedDataNodes =
+ Collections.singletonList(
new TDataNodeLocation(
10,
new TEndPoint("127.0.0.1", 6667),
new TEndPoint("127.0.0.1", 6668),
new TEndPoint("127.0.0.1", 6669),
new TEndPoint("127.0.0.1", 6670),
new TEndPoint("127.0.0.1", 6671)));
-
+ Map<Integer, NodeStatus> nodeStatusMap = new HashMap<>();
+ nodeStatusMap.put(10, NodeStatus.Running);
+ RemoveDataNodesProcedure procedure0 =
+ new RemoveDataNodesProcedure(removedDataNodes, nodeStatusMap);
Review Comment:
Is it necessary to add more test here? For regionMigrationPlans
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+
+ private final ConfigManager configManager;
+
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
dataNodeClientManager;
+
+ public RemoveDataNodeManager(ConfigManager configManager) {
+ this.configManager = configManager;
+ dataNodeClientManager =
+ new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ /**
+ * Check if the data nodes are sufficient after removing.
+ *
+ * @param removedDataNodes List<TDataNodeLocation>
+ * @return true if the number of DataNodes is enough, false otherwise
+ */
+ public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation>
removedDataNodes) {
+ final int availableDatanodeSize =
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.ReadOnly)
+ .size();
+
+ int dataNodeNumAfterRemoving = availableDatanodeSize;
+ for (TDataNodeLocation removedDatanode : removedDataNodes) {
+ if
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+ != NodeStatus.Unknown) {
+ dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+ }
+ }
Review Comment:
Check this check ?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan
removeAINodePlan) {
return true;
}
- // region region migration
+ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation>
dataNodeLocations) {
+ // 1. Only one RemoveDataNodesProcedure is allowed in the cluster
+ Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RemoveDataNodesProcedure) {
+ return !procedure.isFinished();
+ }
+ return false;
+ })
+ .findAny();
+
+ String failMessage = null;
+ if (anotherRemoveProcedure.isPresent()) {
+ List<TDataNodeLocation> anotherRemoveDataNodes =
+ ((RemoveDataNodesProcedure)
anotherRemoveProcedure.get()).getRemovedDataNodes();
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because another RemoveDataNodesProcedure %s is already in
processing. "
+ + "IoTDB is able to have at most 1 RemoveDataNodesProcedure
at the same time. "
+ + "For further information, please search [pid%d] in log. ",
+ anotherRemoveDataNodes,
anotherRemoveProcedure.get().getProcId());
+ }
+
+ // 2. Check if the RemoveDataNodesProcedure conflicts with the
RegionMigrateProcedure
+ RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
+ Set<TConsensusGroupId> removedDataNodesRegionSet =
+ manager.getRemovedDataNodesRegionSet(dataNodeLocations);
+ Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure
=
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RegionMigrateProcedure) {
+ RegionMigrateProcedure regionMigrateProcedure =
+ (RegionMigrateProcedure) procedure;
+ if (regionMigrateProcedure.isFinished()) {
+ return false;
+ }
+ return removedDataNodesRegionSet.contains(
+ regionMigrateProcedure.getConsensusGroupId())
+ ||
dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
+ }
+ return false;
+ })
+ .findAny();
+ if (conflictRegionMigrateProcedure.isPresent()) {
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because another RegionMigrateProcedure %s is already in
processing which conflicts with this RemoveDataNodesProcedure. "
+ + "The RegionMigrateProcedure is migrating the region %s to
the DataNode %s. "
+ + "For further information, please search [pid%d] in log. ",
+ conflictRegionMigrateProcedure.get().getProcId(),
+ ((RegionMigrateProcedure)
conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
+ ((RegionMigrateProcedure)
conflictRegionMigrateProcedure.get()).getDestDataNode(),
+ conflictRegionMigrateProcedure.get().getProcId());
+ }
Review Comment:
"Check existed RegionMigrateProcedure when submit RemoveDataNodesProcedure"
"Check existed RemoveDataNodesProcedure when submit RegionMigrateProcedure"
Their logic should be similar, so maybe extract a function?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RemoveDataNodeManager.class);
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+
+ private final ConfigManager configManager;
+
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
dataNodeClientManager;
+
+ public RemoveDataNodeManager(ConfigManager configManager) {
+ this.configManager = configManager;
+ dataNodeClientManager =
+ new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ }
+
+ /**
+ * Check if the data nodes are sufficient after removing.
+ *
+ * @param removedDataNodes List<TDataNodeLocation>
+ * @return true if the number of DataNodes is enough, false otherwise
+ */
+ public boolean checkEnoughDataNodeAfterRemoving(List<TDataNodeLocation>
removedDataNodes) {
+ final int availableDatanodeSize =
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.ReadOnly)
+ .size();
+
+ int dataNodeNumAfterRemoving = availableDatanodeSize;
+ for (TDataNodeLocation removedDatanode : removedDataNodes) {
+ if
(configManager.getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
+ != NodeStatus.Unknown) {
+ dataNodeNumAfterRemoving = availableDatanodeSize - 1;
+ }
+ }
+
+ return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode();
+ }
+
+ /**
+ * Changes the status of the specified DataNode to the given status. This is
done to prevent the
+ * DataNode from receiving read or write requests when it is being removed
or is in a restricted
+ * state.
+ *
+ * @param dataNodeLocation the location of the DataNode whose status needs
to be changed
+ * @param status the new status to assign to the DataNode (e.g., Removing,
Running, etc.)
+ */
+ public void changeDataNodeStatus(TDataNodeLocation dataNodeLocation,
NodeStatus status) {
+ // Send request to update NodeStatus on the DataNode to be removed
+ if
(configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
+ == NodeStatus.Unknown) {
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ status.getStatus(),
+ CnToDnRequestType.SET_SYSTEM_STATUS,
+ 1);
+ } else {
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ status.getStatus(),
+ CnToDnRequestType.SET_SYSTEM_STATUS);
+ }
+
+ long currentTime = System.nanoTime();
+ // Force updating NodeStatus to NodeStatus.Removing
+ configManager
+ .getLoadManager()
+ .forceUpdateNodeCache(
+ NodeType.DataNode,
+ dataNodeLocation.getDataNodeId(),
+ new NodeHeartbeatSample(currentTime, status));
+ Map<TConsensusGroupId, Map<Integer, RegionHeartbeatSample>>
removingHeartbeatSampleMap =
+ new TreeMap<>();
+ // Force update RegionStatus to NodeStatus.Removing
+ configManager
+ .getPartitionManager()
+ .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+ .forEach(
+ replicaSet ->
+ removingHeartbeatSampleMap.put(
+ replicaSet.getRegionId(),
+ Collections.singletonMap(
+ dataNodeLocation.getDataNodeId(),
+ new RegionHeartbeatSample(currentTime,
RegionStatus.Removing))));
+
configManager.getLoadManager().forceUpdateRegionGroupCache(removingHeartbeatSampleMap);
+ }
+
+ /**
+ * Retrieves all region migration plans for the specified removed DataNodes.
+ *
+ * @param removedDataNodes the list of DataNodes from which to obtain
migration plans
+ * @return a list of region migration plans associated with the removed
DataNodes
+ */
+ public List<RegionMigrationPlan> getRegionMigrationPlans(
+ List<TDataNodeLocation> removedDataNodes) {
+ List<RegionMigrationPlan> regionMigrationPlans = new ArrayList<>();
+ for (TDataNodeLocation removedDataNode : removedDataNodes) {
+ List<TConsensusGroupId> migratedDataNodeRegions =
getMigratedDataNodeRegions(removedDataNode);
+ regionMigrationPlans.addAll(
+ migratedDataNodeRegions.stream()
+ .map(regionId -> RegionMigrationPlan.create(regionId,
removedDataNode))
+ .collect(Collectors.toList()));
+ }
+ return regionMigrationPlans;
+ }
+
+ /**
+ * Broadcasts DataNodes' status change, preventing disabled DataNodes from
accepting read or write
+ * requests.
+ *
+ * @param dataNodes the list of DataNodes that require broadcast status
changes
+ */
+ public void broadcastDataNodeStatusChange(List<TDataNodeLocation> dataNodes)
{
+ String dataNodesString =
+ dataNodes.stream()
+ .map(RegionMaintainHandler::getIdWithRpcEndpoint)
+ .collect(Collectors.joining(", "));
+ LOGGER.info(
+ "{}, BroadcastDataNodeStatusChange start, dataNode: {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNodesString);
+
+ List<TDataNodeConfiguration> otherOnlineDataNodes =
+
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+ .filter(node -> !dataNodes.contains(node.getLocation()))
+ .collect(Collectors.toList());
+
+ for (TDataNodeConfiguration node : otherOnlineDataNodes) {
+ TCleanDataNodeCacheReq disableReq = new
TCleanDataNodeCacheReq(dataNodes);
+ TSStatus status =
+ (TSStatus)
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ node.getLocation().getInternalEndPoint(),
+ disableReq,
+ CnToDnRequestType.CLEAN_DATA_NODE_CACHE);
+ if (!isSucceed(status)) {
+ LOGGER.error(
+ "{}, BroadcastDataNodeStatusChange meets error, dataNode: {},
error: {}",
+ REMOVE_DATANODE_PROCESS,
+ dataNodesString,
+ status);
+ return;
+ }
+ }
Review Comment:
Use AsyncDataNodeClientPool here will make it quicker
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.CnToDnRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static org.apache.iotdb.db.service.RegionMigrateService.isFailed;
+import static org.apache.iotdb.db.service.RegionMigrateService.isSucceed;
+
+public class RemoveDataNodeManager {
Review Comment:
Would you mind to rename this class as RemoveDataNodeHandler :) ? Then it'll
be consistent with the RegionMaintainHandler, and differentiate from many other
Managers in the ConfigNode.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan
removeAINodePlan) {
return true;
}
- // region region migration
+ public TSStatus checkRemoveDataNodes(List<TDataNodeLocation>
dataNodeLocations) {
+ // 1. Only one RemoveDataNodesProcedure is allowed in the cluster
+ Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RemoveDataNodesProcedure) {
+ return !procedure.isFinished();
+ }
+ return false;
+ })
+ .findAny();
+
+ String failMessage = null;
+ if (anotherRemoveProcedure.isPresent()) {
+ List<TDataNodeLocation> anotherRemoveDataNodes =
+ ((RemoveDataNodesProcedure)
anotherRemoveProcedure.get()).getRemovedDataNodes();
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because another RemoveDataNodesProcedure %s is already in
processing. "
+ + "IoTDB is able to have at most 1 RemoveDataNodesProcedure
at the same time. "
+ + "For further information, please search [pid%d] in log. ",
+ anotherRemoveDataNodes,
anotherRemoveProcedure.get().getProcId());
+ }
+
+ // 2. Check if the RemoveDataNodesProcedure conflicts with the
RegionMigrateProcedure
+ RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
+ Set<TConsensusGroupId> removedDataNodesRegionSet =
+ manager.getRemovedDataNodesRegionSet(dataNodeLocations);
+ Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure
=
+ this.executor.getProcedures().values().stream()
+ .filter(
+ procedure -> {
+ if (procedure instanceof RegionMigrateProcedure) {
+ RegionMigrateProcedure regionMigrateProcedure =
+ (RegionMigrateProcedure) procedure;
+ if (regionMigrateProcedure.isFinished()) {
+ return false;
+ }
+ return removedDataNodesRegionSet.contains(
+ regionMigrateProcedure.getConsensusGroupId())
+ ||
dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
+ }
+ return false;
+ })
+ .findAny();
+ if (conflictRegionMigrateProcedure.isPresent()) {
+ failMessage =
+ String.format(
+ "Submit RemoveDataNodesProcedure failed, "
+ + "because another RegionMigrateProcedure %s is already in
processing which conflicts with this RemoveDataNodesProcedure. "
+ + "The RegionMigrateProcedure is migrating the region %s to
the DataNode %s. "
+ + "For further information, please search [pid%d] in log. ",
+ conflictRegionMigrateProcedure.get().getProcId(),
+ ((RegionMigrateProcedure)
conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
+ ((RegionMigrateProcedure)
conflictRegionMigrateProcedure.get()).getDestDataNode(),
+ conflictRegionMigrateProcedure.get().getProcId());
+ }
+ // 3. Check if the RegionMigrateProcedure generated by
RemoveDataNodesProcedure conflicts with
+ // each other
Review Comment:
This is the problem when trying to remove multiple datanodes at the same
time.
When the number of regions is relatively large and their distribution is
sufficiently even, it’s likely that any two datanodes will share the same
regions, which cause conflict.
I think a reasonable approach would be to submit the
RegionMigrationProcedure in several turns, but this doesn’t have to be done in
v1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]