OneSizeFitsQuorum commented on code in PR #12165:
URL: https://github.com/apache/iotdb/pull/12165#discussion_r1525978921
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java:
##########
@@ -53,15 +53,17 @@ public enum ConfigPhysicalPlanType {
CreateRegionGroups((short) 300),
DeleteRegionGroups((short) 301),
GetRegionInfoList((short) 302),
+ @Deprecated
UpdateRegionLocation((short) 303),
Review Comment:
consider compatibility
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -535,125 +537,136 @@ public boolean removeDataNode(RemoveDataNodePlan
removeDataNodePlan) {
return true;
}
- public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
- TConsensusGroupId regionGroupId;
+ // region region migration
+
+ private TConsensusGroupId idToTConsensusGroupId(final int regionId) throws
ProcedureException {
Review Comment:
regionIdToTConsensusGroupId
##########
integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.DataNodeKillPoints;
+import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
+import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
+import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
+import org.apache.iotdb.itbase.exception.InconsistentDataException;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentHashMap.KeySetView;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class IoTDBRegionMigrateReliabilityIT {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityIT.class);
+ private static final String INSERTION =
+ "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 10.1,
20.7)";
+ private static final String SHOW_REGIONS = "show regions";
+ private static final String SHOW_DATANODES = "show datanodes";
+ private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region
%d from %d to %d";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ // region Normal tests
+
+ @Test
+ public void normal1C2DTest() throws Exception {
+ generalTest(1, 1, 1, 2, buildSet(), buildSet());
+ }
+
+ @Test
+ public void normal3C3DTest() throws Exception {
+ generalTest(2, 3, 3, 3, buildSet(), buildSet());
+ }
+
+ // endregion
+
+ // region ConfigNode crash tests
+ @Test
+ public void cnCrashDuringPreCheck() throws Exception {
+ generalTest(
+ 1, 1, 1, 2,
buildSet(RegionTransitionState.REGION_MIGRATE_PREPARE.toString()), buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringCreatePeer() throws Exception {
+ generalTest(
+ 1, 1, 1, 2,
buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER.toString()), buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringDoAddPeer() throws Exception {
+ generalTest(1, 1, 1, 2,
buildSet(AddRegionPeerState.DO_ADD_REGION_PEER.toString()), buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringUpdateCache() throws Exception {
+ generalTest(
+ 1,
+ 1,
+ 1,
+ 2,
+ buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE.toString()),
+ buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringChangeRegionLeader() throws Exception {
+ generalTest(
+ 1, 1, 1, 2,
buildSet(RegionTransitionState.CHANGE_REGION_LEADER.toString()), buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringRemoveRegionPeer() throws Exception {
+ generalTest(
+ 1, 1, 1, 2,
buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER.toString()), buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringDeleteOldRegionPeer() throws Exception {
+ generalTest(
+ 1, 1, 1, 2,
buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER.toString()), buildSet());
+ }
+
+ @Test
+ public void cnCrashDuringRemoveRegionLocationCache() throws Exception {
+ generalTest(
+ 1,
+ 1,
+ 1,
+ 2,
+
buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE.toString()),
+ buildSet());
+ }
+
+ @Test
+ public void cnCrashTest() throws Exception {
+ KeySetView<String, Boolean> killConfigNodeKeywords = buildSet();
+ killConfigNodeKeywords.addAll(
+ Arrays.stream(AddRegionPeerState.values())
+ .map(Enum::toString)
+ .collect(Collectors.toList()));
+ killConfigNodeKeywords.addAll(
+ Arrays.stream(RemoveRegionPeerState.values())
+ .map(Enum::toString)
+ .collect(Collectors.toList()));
+ generalTest(1, 1, 1, 2, killConfigNodeKeywords, buildSet());
+ }
+
+ @Ignore
+ @Test
+ public void badKillPoint() throws Exception {
+ generalTest(1, 1, 1, 2, buildSet("??"), buildSet());
+ }
+
+ // endregion
+
+ // region coordinator DataNode crash tests
+
+ @Test
+ public void coordinatorCrashDuringRemovePeer() throws Exception {
+ generalTest(1, 1, 1, 2, buildSet(),
buildSet(DataNodeKillPoints.CoordinatorRemovePeer.name()));
+ }
+
+ // endregion
+
+ // region original DataNode crash tests
+
+ @Test
+ public void originalCrashDuringRemovePeer() throws Exception {
+ generalTest(1, 1, 1, 2, buildSet(),
buildSet(DataNodeKillPoints.OriginalRemovePeer.name()));
+ }
+
+ @Test
+ public void originalCrashDuringDeleteLocalPeer() throws Exception {
+ generalTest(
+ 1, 1, 1, 2, buildSet(),
buildSet(DataNodeKillPoints.OriginalDeleteOldRegionPeer.name()));
+ }
+
+ // region Helpers
+
+ public void generalTest(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ KeySetView<String, Boolean> killConfigNodeKeywords,
+ KeySetView<String, Boolean> killDataNodeKeywords // TODO:此参数尚未生效
Review Comment:
remove todo
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java:
##########
@@ -234,8 +213,10 @@ public void deserialize(ByteBuffer byteBuffer) {
originalDataNode =
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
destDataNode =
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
consensusGroupId =
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+ coordinatorForAddPeer =
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
Review Comment:
consider compatibility
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -635,22 +623,68 @@ public void recoverConfiguration() {
// interrupted
// unexpectedly, we need substitute configuration with tmpConfiguration
file
if (Files.exists(tmpConfigurationPath)) {
- if (Files.exists(configurationPath)) {
- Files.delete(configurationPath);
- }
+ Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
}
- buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
- int size = buffer.getInt();
- for (int i = 0; i < size; i++) {
- configuration.add(Peer.deserialize(buffer));
+ if (Files.exists(configurationPath)) {
Review Comment:
how about adding a function to force the conversion of the.dat file, and
then the following is all about restoring from a single file, so that after a
few versions you can consider removing the function?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java:
##########
@@ -139,6 +168,34 @@ public synchronized boolean
submitDeleteOldRegionPeerTask(TMaintainPeerReq req)
return submitSucceed;
}
+ public synchronized TSStatus resetPeerList(TResetPeerListReq req) {
+ List<Peer> correctPeers =
+ req.getCorrectLocations().stream()
+ .map(location -> Peer.valueOf(req.getRegionId(), location))
+ .collect(Collectors.toList());
+ ConsensusGroupId regionId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
+ try {
+ if (regionId instanceof DataRegionId) {
+ DataRegionConsensusImpl.getInstance().resetPeerList(regionId,
correctPeers);
+ } else {
+ SchemaRegionConsensusImpl.getInstance().resetPeerList(regionId,
correctPeers);
+ }
+ } catch (ConsensusException e) {
+ LOGGER.error("reset peer list fail", e);
+ return new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private boolean addToTaskResultMap(long taskId) {
Review Comment:
use compute API
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -635,22 +623,68 @@ public void recoverConfiguration() {
// interrupted
// unexpectedly, we need substitute configuration with tmpConfiguration
file
if (Files.exists(tmpConfigurationPath)) {
- if (Files.exists(configurationPath)) {
- Files.delete(configurationPath);
- }
+ Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
}
- buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
- int size = buffer.getInt();
- for (int i = 0; i < size; i++) {
- configuration.add(Peer.deserialize(buffer));
+ if (Files.exists(configurationPath)) {
+ // recover from old configuration file
+ buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
+ int size = buffer.getInt();
+ for (int i = 0; i < size; i++) {
+ configuration.add(Peer.deserialize(buffer));
+ }
+ Files.delete(configurationPath);
+ persistConfiguration();
Review Comment:
persist first, delete old configuration last
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java:
##########
@@ -69,8 +80,16 @@ public class DataNodeRemoveHandler {
/** region migrate lock */
private final LockQueue regionMigrateLock = new LockQueue();
- public DataNodeRemoveHandler(ConfigManager configManager) {
+ private static final long DATANODE_MAX_DISCONNECTION_MS = 10000;
Review Comment:
fetch from loadmanager
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java:
##########
@@ -207,15 +207,18 @@ public boolean verifySucceed(TSStatus... status) {
.allMatch(tsStatus -> tsStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
- return getNodeManager()
- .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.ReadOnly)
- .size()
- - Boolean.compare(
- getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
- != NodeStatus.Unknown,
- false)
- >= NodeInfo.getMinimumDataNode();
+ public boolean checkEnoughDataNodeAfterRemoving(TDataNodeLocation
removedDatanode) {
+ final int runningOrReadOnlyDataNodeNum =
+ getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running,
NodeStatus.ReadOnly)
+ .size();
+ int dataNodeNumAfterRemoving;
+ if (getLoadManager().getNodeStatus(removedDatanode.getDataNodeId()) !=
NodeStatus.Unknown) {
Review Comment:
consider REMOVING status
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java:
##########
@@ -279,17 +279,21 @@ private void recordRegionPriorityMap(
regionPriorityEntry : priorityMap.entrySet()) {
if (!Objects.equals(
regionPriorityEntry.getValue().getRight(),
regionPriorityEntry.getValue().getLeft())) {
- LOGGER.info(
- "[RegionPriority]\t {}: {}->{}",
- regionPriorityEntry.getKey(),
- regionPriorityEntry.getValue().getLeft() == null
- ? "null"
- :
regionPriorityEntry.getValue().getLeft().getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toList()),
-
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toList()));
+ try {
+ LOGGER.info(
+ "[RegionPriority]\t {}: {}->{}",
+ regionPriorityEntry.getKey(),
+ regionPriorityEntry.getValue().getLeft() == null
+ ? "null"
+ :
regionPriorityEntry.getValue().getLeft().getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()),
+
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
+ } catch (Exception e) {
Review Comment:
@CRZbulabula PTAL
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java:
##########
@@ -535,125 +537,136 @@ public boolean removeDataNode(RemoveDataNodePlan
removeDataNodePlan) {
return true;
}
- public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
- TConsensusGroupId regionGroupId;
+ // region region migration
+
+ private TConsensusGroupId idToTConsensusGroupId(final int regionId) throws
ProcedureException {
if (configManager
.getPartitionManager()
- .isRegionGroupExists(
- new TConsensusGroupId(
- TConsensusGroupType.SchemaRegion,
migrateRegionReq.getRegionId()))) {
- regionGroupId =
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion,
migrateRegionReq.getRegionId());
- } else if (configManager
+ .isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
+ return new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId);
+ }
+ if (configManager
.getPartitionManager()
- .isRegionGroupExists(
- new TConsensusGroupId(
- TConsensusGroupType.DataRegion,
migrateRegionReq.getRegionId()))) {
- regionGroupId =
- new TConsensusGroupId(TConsensusGroupType.DataRegion,
migrateRegionReq.getRegionId());
- } else {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because RegionGroup: {}
doesn't exist",
- migrateRegionReq.getRegionId());
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- String.format(
- "Submit RegionMigrateProcedure failed, because RegionGroup: %s
doesn't exist",
- migrateRegionReq.getRegionId()));
- return status;
+ .isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
+ return new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId);
}
+ String msg =
+ String.format(
+ "Submit RegionMigrateProcedure failed, because RegionGroup: %s
doesn't exist",
+ regionId);
+ LOGGER.warn(msg);
+ throw new ProcedureException(msg);
+ }
- TDataNodeLocation originalDataNode =
- configManager
- .getNodeManager()
- .getRegisteredDataNode(migrateRegionReq.getFromId())
- .getLocation();
- TDataNodeLocation destDataNode =
- configManager
- .getNodeManager()
- .getRegisteredDataNode(migrateRegionReq.getToId())
- .getLocation();
-
+ private TSStatus checkRegionMigrate(
+ TMigrateRegionReq migrateRegionReq,
+ TConsensusGroupId regionGroupId,
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation destDataNode,
+ TDataNodeLocation coordinatorForAddPeer) {
+ String failMessage = null;
if (originalDataNode == null) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because no original DataNode
{}",
- migrateRegionReq.getFromId());
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because no original DataNode "
- + migrateRegionReq.getFromId());
- return status;
+ failMessage =
+ String.format(
+ "Submit RegionMigrateProcedure failed, because no original
DataNode %d",
+ migrateRegionReq.getFromId());
} else if (destDataNode == null) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because no target DataNode
{}",
- migrateRegionReq.getToId());
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because no target DataNode "
- + migrateRegionReq.getToId());
- return status;
+ failMessage =
+ String.format(
+ "Submit RegionMigrateProcedure failed, because no target
DataNode %s",
+ migrateRegionReq.getToId());
+ } else if (coordinatorForAddPeer == null) {
+ failMessage =
+ String.format(
+ "%s, There are no other DataNodes could be selected to perform
the add peer process, "
+ + "please check RegionGroup: %s by show regions sql command",
+ REGION_MIGRATE_PROCESS, regionGroupId);
} else if (configManager.getPartitionManager()
.getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
.noneMatch(replicaSet ->
replicaSet.getRegionId().equals(regionGroupId))) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because the original DataNode
{} doesn't contain Region {}",
- migrateRegionReq.getFromId(),
- migrateRegionReq.getRegionId());
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because the original DataNode
"
- + migrateRegionReq.getFromId()
- + " doesn't contain Region "
- + migrateRegionReq.getRegionId());
- return status;
+ failMessage =
+ String.format(
+ "Submit RegionMigrateProcedure failed, because the original
DataNode %s doesn't contain Region %s",
+ migrateRegionReq.getFromId(), migrateRegionReq.getRegionId());
} else if
(configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
.stream()
.anyMatch(replicaSet ->
replicaSet.getRegionId().equals(regionGroupId))) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because the target DataNode
{} already contains Region {}",
- migrateRegionReq.getToId(),
- migrateRegionReq.getRegionId());
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because the target DataNode "
- + migrateRegionReq.getToId()
- + " already contains Region "
- + migrateRegionReq.getRegionId());
- return status;
- }
- // Here we only check Running DataNode to implement migration, because
removing nodes may not
- // exist when add peer is performing
- Set<Integer> aliveDataNodes =
-
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
- .map(TDataNodeConfiguration::getLocation)
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toSet());
- if (NodeStatus.Unknown.equals(
+ failMessage =
+ String.format(
+ "Submit RegionMigrateProcedure failed, because the target
DataNode %s already contains Region %s",
+ migrateRegionReq.getToId(), migrateRegionReq.getRegionId());
+ } else if (NodeStatus.Unknown.equals(
configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because the sourceDataNode {}
is Unknown.",
- migrateRegionReq.getFromId());
- TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because the sourceDataNode "
- + migrateRegionReq.getFromId()
- + " is Unknown.");
- return status;
+ failMessage =
+ String.format(
+ "Submit RegionMigrateProcedure failed, because the
sourceDataNode %s is Unknown.",
+ migrateRegionReq.getFromId());
Review Comment:
support sourceDataNode unknown
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java:
##########
@@ -392,6 +392,30 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}
+ public void resetPeerList(ConsensusGroupId groupId, List<Peer> peers) throws
ConsensusException {
+ IoTConsensusServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.isReadOnly()) {
+ throw new ConsensusException("system is in read-only status now");
+ } else if (!impl.isActive()) {
+ throw new ConsensusException(
+ "peer is inactive and not ready to receive reset configuration
request.");
+ } else {
+ for (Peer peer : impl.getConfiguration()) {
+ if (!peers.contains(peer)) {
+ try {
+ removeRemotePeer(groupId, peer);
Review Comment:
consider whether this is usable
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/RootProcedureStack.java:
##########
@@ -74,6 +74,7 @@ protected synchronized void unsetRollback() {
state = State.FAILED;
}
+ // TODO: check whether this method return sub-procedures from every level
Review Comment:
cc @CRZbulabula
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java:
##########
@@ -42,29 +39,22 @@
import java.nio.ByteBuffer;
import java.util.Objects;
-import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
-import static
org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint;
-import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
-
/** Region migrate procedure */
public class RegionMigrateProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv,
RegionTransitionState> {
// TODO: Reach an agreement on RegionMigrateProcedure
Review Comment:
remove todo?
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils;
+
+public enum DataNodeKillPoints {
Review Comment:
@Testonly
##########
iotdb-protocol/thrift-commons/src/main/thrift/common.thrift:
##########
@@ -90,12 +90,28 @@ struct TDataNodeConfiguration {
2: required TNodeResource resource
}
+// TODO: deprecated
enum TRegionMigrateFailedType {
Review Comment:
remove this field
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]