OneSizeFitsQuorum commented on code in PR #14634:
URL: https://github.com/apache/iotdb/pull/14634#discussion_r1914369480
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -760,23 +727,179 @@ private TSStatus checkRegionMigrate(
String.format(
"Submit RegionMigrateProcedure failed, because the target
DataNode %s already contains Region %s",
migrateRegionReq.getToId(), migrateRegionReq.getRegionId());
- } else if (!configManager
- .getNodeManager()
- .filterDataNodeThroughStatus(NodeStatus.Running)
+ }
+
+ if (failMessage != null) {
+ LOGGER.warn(failMessage);
+ TSStatus failStatus = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ failStatus.setMessage(failMessage);
+ return failStatus;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus checkReconstructRegion(
+ TReconstructRegionReq req,
+ TConsensusGroupId regionId,
+ TDataNodeLocation targetDataNode,
+ TDataNodeLocation coordinator) {
+ String failMessage =
+ regionOperationCommonCheck(
+ regionId,
+ targetDataNode,
+ Arrays.asList(
+ new Pair<>("Target DataNode", targetDataNode),
Review Comment:
try to do not use magic string?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java:
##########
@@ -4177,6 +4181,30 @@ public Statement
visitMigrateRegion(IoTDBSqlParser.MigrateRegionContext ctx) {
Integer.parseInt(ctx.toId.getText()));
}
+ @Override
+ public Statement
visitReconstructRegion(IoTDBSqlParser.ReconstructRegionContext ctx) {
+ int dataNodeId = Integer.parseInt(ctx.targetDataNodeId.getText());
+ List<Integer> regionIds =
+ ctx.INTEGER_LITERAL().stream()
+ .map(ParseTree::getText)
+ .map(Integer::parseInt)
+ .collect(Collectors.toList());
+ regionIds.remove(regionIds.size() - 1);
Review Comment:
try to find a better way like this?
<img width="666" alt="image"
src="https://github.com/user-attachments/assets/e7438825-1f24-47af-85d9-7553ed8f6087"
/>
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDB_RegionReconstruct_IoTV1_IT.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.commit;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.read.common.RowRecord;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
+
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDB_RegionReconstruct_IoTV1_IT extends
IoTDBRegionOperationReliabilityITFramework {
+ private static final String RECONSTRUCT_FORMAT = "reconstruct region %d on
%d";
+ private static Logger LOGGER =
LoggerFactory.getLogger(IoTDB_RegionReconstruct_IoTV1_IT.class);
+
+ @Test
+ public void normal1C3DTest() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataReplicationFactor(2)
+ .setSchemaReplicationFactor(3);
+
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+
+ try (Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement = makeItCloseQuietly(connection.createStatement());
+ SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // prepare data
+ statement.execute(INSERTION1);
+ statement.execute(FLUSH_COMMAND);
+
+ // collect necessary information
+ Map<Integer, Set<Integer>> dataRegionMap = getDataRegionMap(statement);
+ Set<Integer> allDataNodeId = getAllDataNodes(statement);
+
+ // select datanode
+ final int selectedRegion = 1;
+ Assert.assertTrue(dataRegionMap.containsKey(selectedRegion));
+ Assert.assertEquals(2, dataRegionMap.get(selectedRegion).size());
+ Iterator<Integer> iterator =
dataRegionMap.get(selectedRegion).iterator();
+ final int dataNodeToBeClosed = iterator.next();
+ final int dataNodeToBeReconstructed = iterator.next();
+ final int dataNodeAlwaysGood =
+ allDataNodeId.stream()
+ .filter(x -> x != dataNodeToBeReconstructed && x !=
dataNodeToBeClosed)
+ .findAny()
+ .get();
+ final DataNodeWrapper dataNodeWrapper =
+ EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeAlwaysGood).get();
+ Session session =
+ new Session.Builder()
+ .host(dataNodeWrapper.getIp())
+ .port(dataNodeWrapper.getPort())
+ .build();
+ session.open();
+
+ // delete one DataNode's data dir, stop another DataNode
+ FileUtils.deleteDirectory(
+ new File(
+ EnvFactory.getEnv()
+ .dataNodeIdToWrapper(dataNodeToBeReconstructed)
+ .get()
+ .getDataPath()));
+
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeToBeClosed).get().stopForcibly();
+
+ // now, the query should throw exception
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () -> session.executeQueryStatement("select * from root.**"));
+
+ // start DataNode, reconstruct the delete one
+
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeToBeClosed).get().start();
+ EnvFactory.getAbstractEnv().checkNodeInStatus(dataNodeToBeClosed,
NodeStatus.Running);
+ session.executeNonQueryStatement(
+ String.format(RECONSTRUCT_FORMAT, selectedRegion,
dataNodeToBeReconstructed));
+ Thread.sleep(5000);
Review Comment:
do we need this line as you already use Awaitility.await()
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -903,6 +1035,141 @@ public TSStatus migrateRegion(TMigrateRegionReq
migrateRegionReq) {
}
}
+ public TSStatus reconstructRegion(TReconstructRegionReq req) {
+ RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ final TDataNodeLocation targetDataNode =
+
configManager.getNodeManager().getRegisteredDataNode(req.getDataNodeId()).getLocation();
+ try (AutoCloseableLock ignoredLock =
+ AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) {
+ List<ReconstructRegionProcedure> procedures = new ArrayList<>();
+ for (int x : req.getRegionIds()) {
+ TConsensusGroupId regionId =
+ configManager
+ .getPartitionManager()
+ .generateTConsensusGroupIdByRegionId(x)
+ .orElseThrow(() -> new IllegalArgumentException("Region id " +
x + " is invalid"));
+ final TDataNodeLocation coordinator =
+ handler
+ .filterDataNodeWithOtherRegionReplica(
+ regionId,
+ targetDataNode,
+ NodeStatus.Running,
+ NodeStatus.Removing,
+ NodeStatus.ReadOnly)
+ .orElse(null);
+ TSStatus status = checkReconstructRegion(req, regionId,
targetDataNode, coordinator);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ procedures.add(new ReconstructRegionProcedure(regionId,
targetDataNode, coordinator));
+ }
+ // all checks pass, submit all procedures
+ procedures.forEach(
+ reconstructRegionProcedure -> {
+ this.executor.submitProcedure(reconstructRegionProcedure);
+ LOGGER.info(
+ "[ReconstructRegion] Submit ReconstructRegionProcedure
successfully, {}",
+ reconstructRegionProcedure);
+ });
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ public TSStatus extendRegion(TExtendRegionReq req) {
+ try (AutoCloseableLock ignoredLock =
+ AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) {
+ TConsensusGroupId regionId;
+ Optional<TConsensusGroupId> optional =
+ configManager
+ .getPartitionManager()
+ .generateTConsensusGroupIdByRegionId(req.getRegionId());
+ if (optional.isPresent()) {
+ regionId = optional.get();
+ } else {
+ LOGGER.error("get region group id fail");
+ return new TSStatus(TSStatusCode.EXTEND_REGION_ERROR.getStatusCode())
+ .setMessage("get region group id fail");
+ }
+
+ // find target dn
+ final TDataNodeLocation targetDataNode =
+
configManager.getNodeManager().getRegisteredDataNode(req.getDataNodeId()).getLocation();
+ // select coordinator for adding peer
+ RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ // TODO: choose the DataNode which has lowest load
+ final TDataNodeLocation coordinator =
+ handler
+ .filterDataNodeWithOtherRegionReplica(
+ regionId,
+ targetDataNode,
+ NodeStatus.Running,
+ NodeStatus.Removing,
+ NodeStatus.ReadOnly)
+ .orElse(null);
+ // do the check
+ TSStatus status = checkExtendRegion(req, regionId, targetDataNode,
coordinator);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ // submit procedure
+ AddRegionPeerProcedure procedure =
+ new AddRegionPeerProcedure(regionId, coordinator, targetDataNode);
+ this.executor.submitProcedure(procedure);
+ LOGGER.info("[ExtendRegion] Submit AddRegionPeerProcedure successfully:
{}", procedure);
+
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+ }
+
+ public TSStatus removeRegion(TRemoveRegionReq req) {
+ try (AutoCloseableLock ignoredLock =
+ AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) {
+ TConsensusGroupId regionId;
+ Optional<TConsensusGroupId> optional =
+ configManager
+ .getPartitionManager()
+ .generateTConsensusGroupIdByRegionId(req.getRegionId());
+ if (optional.isPresent()) {
+ regionId = optional.get();
+ } else {
+ LOGGER.error("get region group id fail");
+ return new
TSStatus(TSStatusCode.REMOVE_REGION_PEER_ERROR.getStatusCode())
+ .setMessage("get region group id fail");
+ }
+
+ // find target dn
+ final TDataNodeLocation targetDataNode =
+
configManager.getNodeManager().getRegisteredDataNode(req.getDataNodeId()).getLocation();
+
+ // select coordinator for removing peer
+ RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ final TDataNodeLocation coordinator =
Review Comment:
use current leader now?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java:
##########
@@ -37,10 +37,11 @@ public enum ProcedureType {
DELETE_DATABASE_PROCEDURE((short) 200),
REGION_MIGRATE_PROCEDURE((short) 201),
CREATE_REGION_GROUPS((short) 202),
+ ADD_REGION_PEER_PROCEDURE((short) 203),
+ REMOVE_REGION_PEER_PROCEDURE((short) 204),
+ RECONSTRUCT_REGION_PROCEDURE((short) 205),
Review Comment:
Consider compatibility. Perhaps you could set RECONSTRUCT_REGION_PROCEDURE
to 203
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -903,6 +1035,141 @@ public TSStatus migrateRegion(TMigrateRegionReq
migrateRegionReq) {
}
}
+ public TSStatus reconstructRegion(TReconstructRegionReq req) {
+ RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ final TDataNodeLocation targetDataNode =
+
configManager.getNodeManager().getRegisteredDataNode(req.getDataNodeId()).getLocation();
+ try (AutoCloseableLock ignoredLock =
+ AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) {
+ List<ReconstructRegionProcedure> procedures = new ArrayList<>();
+ for (int x : req.getRegionIds()) {
+ TConsensusGroupId regionId =
+ configManager
+ .getPartitionManager()
+ .generateTConsensusGroupIdByRegionId(x)
+ .orElseThrow(() -> new IllegalArgumentException("Region id " +
x + " is invalid"));
+ final TDataNodeLocation coordinator =
+ handler
+ .filterDataNodeWithOtherRegionReplica(
+ regionId,
+ targetDataNode,
+ NodeStatus.Running,
+ NodeStatus.Removing,
+ NodeStatus.ReadOnly)
+ .orElse(null);
+ TSStatus status = checkReconstructRegion(req, regionId,
targetDataNode, coordinator);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ procedures.add(new ReconstructRegionProcedure(regionId,
targetDataNode, coordinator));
+ }
+ // all checks pass, submit all procedures
+ procedures.forEach(
+ reconstructRegionProcedure -> {
+ this.executor.submitProcedure(reconstructRegionProcedure);
+ LOGGER.info(
+ "[ReconstructRegion] Submit ReconstructRegionProcedure
successfully, {}",
+ reconstructRegionProcedure);
+ });
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ public TSStatus extendRegion(TExtendRegionReq req) {
+ try (AutoCloseableLock ignoredLock =
+ AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) {
+ TConsensusGroupId regionId;
+ Optional<TConsensusGroupId> optional =
+ configManager
+ .getPartitionManager()
+ .generateTConsensusGroupIdByRegionId(req.getRegionId());
+ if (optional.isPresent()) {
+ regionId = optional.get();
+ } else {
+ LOGGER.error("get region group id fail");
+ return new TSStatus(TSStatusCode.EXTEND_REGION_ERROR.getStatusCode())
+ .setMessage("get region group id fail");
+ }
+
+ // find target dn
+ final TDataNodeLocation targetDataNode =
+
configManager.getNodeManager().getRegisteredDataNode(req.getDataNodeId()).getLocation();
+ // select coordinator for adding peer
+ RegionMaintainHandler handler = env.getRegionMaintainHandler();
+ // TODO: choose the DataNode which has lowest load
+ final TDataNodeLocation coordinator =
+ handler
+ .filterDataNodeWithOtherRegionReplica(
Review Comment:
use current leader now?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -760,23 +727,179 @@ private TSStatus checkRegionMigrate(
String.format(
"Submit RegionMigrateProcedure failed, because the target
DataNode %s already contains Region %s",
migrateRegionReq.getToId(), migrateRegionReq.getRegionId());
- } else if (!configManager
- .getNodeManager()
- .filterDataNodeThroughStatus(NodeStatus.Running)
+ }
+
+ if (failMessage != null) {
+ LOGGER.warn(failMessage);
+ TSStatus failStatus = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ failStatus.setMessage(failMessage);
+ return failStatus;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus checkReconstructRegion(
+ TReconstructRegionReq req,
+ TConsensusGroupId regionId,
+ TDataNodeLocation targetDataNode,
+ TDataNodeLocation coordinator) {
+ String failMessage =
+ regionOperationCommonCheck(
+ regionId,
+ targetDataNode,
+ Arrays.asList(
+ new Pair<>("Target DataNode", targetDataNode),
+ new Pair<>("Coordinator", coordinator)));
+
+ ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+ if (configManager
+ .getPartitionManager()
+ .getAllReplicaSetsMap(regionId.getType())
+ .get(regionId)
+ .getDataNodeLocationsSize()
+ == 1) {
+ failMessage = String.format("%s only has 1 replica, it cannot be
reconstructed", regionId);
+ } else if (configManager
+ .getPartitionManager()
+ .getAllReplicaSets(targetDataNode.getDataNodeId())
+ .stream()
+ .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionId))) {
+ failMessage =
+ String.format(
+ "Submit ReconstructRegionProcedure failed, because the target
DataNode %s doesn't contain Region %s",
+ req.getDataNodeId(), regionId);
+ }
+
+ if (failMessage != null) {
+ LOGGER.warn(failMessage);
+ TSStatus failStatus = new
TSStatus(TSStatusCode.RECONSTRUCT_REGION_ERROR.getStatusCode());
+ failStatus.setMessage(failMessage);
+ return failStatus;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus checkExtendRegion(
+ TExtendRegionReq req,
+ TConsensusGroupId regionId,
+ TDataNodeLocation targetDataNode,
+ TDataNodeLocation coordinator) {
+ String failMessage =
+ regionOperationCommonCheck(
+ regionId,
+ targetDataNode,
+ Arrays.asList(
+ new Pair<>("Target DataNode", targetDataNode),
Review Comment:
same
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDB_RegionReconstruct_IoTV1_IT.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.commit;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.read.common.RowRecord;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
+
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDB_RegionReconstruct_IoTV1_IT extends
IoTDBRegionOperationReliabilityITFramework {
Review Comment:
Don't mix camel case nomenclature
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -760,23 +727,179 @@ private TSStatus checkRegionMigrate(
String.format(
"Submit RegionMigrateProcedure failed, because the target
DataNode %s already contains Region %s",
migrateRegionReq.getToId(), migrateRegionReq.getRegionId());
- } else if (!configManager
- .getNodeManager()
- .filterDataNodeThroughStatus(NodeStatus.Running)
+ }
+
+ if (failMessage != null) {
+ LOGGER.warn(failMessage);
+ TSStatus failStatus = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ failStatus.setMessage(failMessage);
+ return failStatus;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus checkReconstructRegion(
+ TReconstructRegionReq req,
+ TConsensusGroupId regionId,
+ TDataNodeLocation targetDataNode,
+ TDataNodeLocation coordinator) {
+ String failMessage =
+ regionOperationCommonCheck(
+ regionId,
+ targetDataNode,
+ Arrays.asList(
+ new Pair<>("Target DataNode", targetDataNode),
+ new Pair<>("Coordinator", coordinator)));
+
+ ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+ if (configManager
+ .getPartitionManager()
+ .getAllReplicaSetsMap(regionId.getType())
+ .get(regionId)
+ .getDataNodeLocationsSize()
+ == 1) {
+ failMessage = String.format("%s only has 1 replica, it cannot be
reconstructed", regionId);
+ } else if (configManager
+ .getPartitionManager()
+ .getAllReplicaSets(targetDataNode.getDataNodeId())
+ .stream()
+ .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionId))) {
+ failMessage =
+ String.format(
+ "Submit ReconstructRegionProcedure failed, because the target
DataNode %s doesn't contain Region %s",
+ req.getDataNodeId(), regionId);
+ }
+
+ if (failMessage != null) {
+ LOGGER.warn(failMessage);
+ TSStatus failStatus = new
TSStatus(TSStatusCode.RECONSTRUCT_REGION_ERROR.getStatusCode());
+ failStatus.setMessage(failMessage);
+ return failStatus;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus checkExtendRegion(
+ TExtendRegionReq req,
+ TConsensusGroupId regionId,
+ TDataNodeLocation targetDataNode,
+ TDataNodeLocation coordinator) {
+ String failMessage =
+ regionOperationCommonCheck(
+ regionId,
+ targetDataNode,
+ Arrays.asList(
+ new Pair<>("Target DataNode", targetDataNode),
+ new Pair<>("Coordinator", coordinator)));
+ if (configManager
+ .getPartitionManager()
+ .getAllReplicaSets(targetDataNode.getDataNodeId())
+ .stream()
+ .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionId))) {
+ failMessage =
+ String.format(
+ "Target DataNode %s already contains region %s",
+ targetDataNode.getDataNodeId(), req.getRegionId());
+ }
+
+ if (failMessage != null) {
+ LOGGER.warn(failMessage);
+ TSStatus failStatus = new
TSStatus(TSStatusCode.RECONSTRUCT_REGION_ERROR.getStatusCode());
+ failStatus.setMessage(failMessage);
+ return failStatus;
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus checkRemoveRegion(
+ TRemoveRegionReq req,
+ TConsensusGroupId regionId,
+ TDataNodeLocation targetDataNode,
+ TDataNodeLocation coordinator) {
+ String failMessage =
+ regionOperationCommonCheck(
+ regionId,
+ targetDataNode,
+ Arrays.asList(
+ new Pair<>("Target DataNode", targetDataNode),
Review Comment:
same
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -794,39 +917,48 @@ private TSStatus checkRegionMigrate(
((RemoveDataNodesProcedure)
conflictRemoveDataNodesProcedure.get()).getRemovedDataNodes();
Set<TConsensusGroupId> removedDataNodesRegionSet =
removeDataNodeHandler.getRemovedDataNodesRegionSet(removedDataNodes);
- if (removedDataNodesRegionSet.contains(regionGroupId)) {
- failMessage =
- String.format(
- "Submit RegionMigrateProcedure failed, "
- + "because another RemoveDataNodesProcedure %s is already
in processing which conflicts with this RegionMigrateProcedure. "
- + "The RemoveDataNodesProcedure is removing the DataNodes
%s which contains the region %s. "
- + "For further information, please search [pid%d] in log.
",
- conflictRemoveDataNodesProcedure.get().getProcId(),
- removedDataNodes,
- regionGroupId,
- conflictRemoveDataNodesProcedure.get().getProcId());
- } else if (removedDataNodes.contains(destDataNode)) {
- failMessage =
- String.format(
- "Submit RegionMigrateProcedure failed, "
- + "because another RemoveDataNodesProcedure %s is already
in processing which conflicts with this RegionMigrateProcedure. "
- + "The RemoveDataNodesProcedure is removing the target
DataNode %s. "
- + "For further information, please search [pid%d] in log.
",
- conflictRemoveDataNodesProcedure.get().getProcId(),
- destDataNode,
- conflictRemoveDataNodesProcedure.get().getProcId());
+ if (removedDataNodesRegionSet.contains(regionId)) {
+ return String.format(
+ "Another RemoveDataNodesProcedure %s is already in processing
which conflicts with this procedure. "
+ + "The RemoveDataNodesProcedure is removing the DataNodes %s
which contains the region %s. "
+ + "For further information, please search [pid%d] in log. ",
+ conflictRemoveDataNodesProcedure.get().getProcId(),
+ removedDataNodes,
+ regionId,
+ conflictRemoveDataNodesProcedure.get().getProcId());
+ } else if (removedDataNodes.contains(targetDataNode)) {
+ return String.format(
+ "Another RemoveDataNodesProcedure %s is already in processing
which conflicts with this procedure. "
+ + "The RemoveDataNodesProcedure is removing the target
DataNode %s. "
+ + "For further information, please search [pid%d] in log. ",
+ conflictRemoveDataNodesProcedure.get().getProcId(),
+ targetDataNode,
+ conflictRemoveDataNodesProcedure.get().getProcId());
}
}
+ return null;
+ }
- if (failMessage != null) {
- LOGGER.warn(failMessage);
- TSStatus failStatus = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- failStatus.setMessage(failMessage);
- return failStatus;
+ private String checkRegionOperationDuplication(TConsensusGroupId regionId) {
+ List<? extends RegionOperationProcedure<?>>
otherRegionMemberChangeProcedures =
+ getExecutor().getProcedures().values().stream()
+ .filter(procedure -> !procedure.isFinished())
+ .filter(procedure -> procedure instanceof RegionOperationProcedure)
+ .map(procedure -> (RegionOperationProcedure<?>) procedure)
+ .filter(
+ regionMemberChangeProcedure ->
+ regionId.equals(regionMemberChangeProcedure.getRegionId()))
+ .collect(Collectors.toList());
+ if (!otherRegionMemberChangeProcedures.isEmpty()) {
+ return String.format(
+ "%s has some other region operation procedures in progress, their
procedure id is: %s",
+ regionId, otherRegionMemberChangeProcedures);
}
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ return null;
}
+ // end region
+
public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
env.getSubmitRegionMigrateLock().lock();
Review Comment:
use try with resource too?
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDB_RegionGroupExpandAndShrink_IoTV1_IT.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.commit;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
+
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDB_RegionGroupExpandAndShrink_IoTV1_IT
Review Comment:
Don't mix camel case nomenclature
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDB_RegionGroupExpandAndShrink_IoTV1_IT.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.commit;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
+
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDB_RegionGroupExpandAndShrink_IoTV1_IT
+ extends IoTDBRegionOperationReliabilityITFramework {
+ private static final String EXPAND_FORMAT = "extend region %d to %d";
+ private static final String SHRINK_FORMAT = "remove region %d from %d";
+
+ private static Logger LOGGER =
+ LoggerFactory.getLogger(IoTDB_RegionGroupExpandAndShrink_IoTV1_IT.class);
+
+ /**
+ * 1. Expand: {a} -> {a,b} -> ... -> {a,b,c,d,e}
Review Comment:
lack of `<p>`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]