OneSizeFitsQuorum commented on code in PR #12293:
URL: https://github.com/apache/iotdb/pull/12293#discussion_r1559153315


##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java:
##########
@@ -103,7 +103,11 @@ public void open() throws TTransportException {
 
   @Override
   public void close() {
-    underlying.close();
+    try {
+      underlying.close();
+    } catch (Exception ignore) {
+

Review Comment:
   same



##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java:
##########
@@ -185,8 +185,8 @@ private void closeClientOperation() throws SQLException {
         RpcUtils.verifySuccess(closeResp);
         stmtId = -1;
       }
-    } catch (Exception e) {
-      throw new SQLException("Error occurs when closing statement.", e);
+    } catch (Exception ignore) {

Review Comment:
   same



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java:
##########
@@ -348,27 +348,30 @@ public TSStatus deleteOldRegionPeer(
   }
 
   public TSStatus resetPeerList(
-      TConsensusGroupId regionId, List<TDataNodeLocation> 
correctDataNodeLocations) {
-    Optional<TDataNodeLocation> optional = 
filterDataNodeWithOtherRegionReplica(regionId, null);
-    TDataNodeLocation selectDataNode = optional.get();
+      TConsensusGroupId regionId,
+      List<TDataNodeLocation> correctDataNodeLocations,
+      TDataNodeLocation target) {
     TSStatus status =

Review Comment:
   try to use sendAsyncRequestToDataNode



##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java:
##########
@@ -155,9 +155,8 @@ public void close() throws SQLException {
     TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
     try {
       getClient().closeSession(req);
-    } catch (TException e) {

Review Comment:
   recover



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java:
##########
@@ -79,39 +82,31 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, 
AddRegionPeerState s
       outerSwitch:
       switch (state) {
         case CREATE_NEW_REGION_PEER:
-          handler.createNewRegionPeer(consensusGroupId, destDataNode);
+          TSStatus status = handler.createNewRegionPeer(consensusGroupId, 
destDataNode);
           setKillPoint(state);
+          if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
+            rollback(env, handler);
+          }
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
         case DO_ADD_REGION_PEER:
-          TSStatus tsStatus =
-              handler.addRegionPeer(this.getProcId(), destDataNode, 
consensusGroupId, coordinator);
-          setKillPoint(state);
-          TRegionMigrateResult result;
-          if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            result = handler.waitTaskFinish(this.getProcId(), coordinator);
-          } else {
-            throw new ProcedureException("ADD_REGION_PEER executed failed in 
DataNode");
+          if (!this.isStateDeserialized()) {

Review Comment:
   add some comments for corner case



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java:
##########
@@ -145,6 +140,51 @@ protected Flow executeFromState(ConfigNodeProcedureEnv 
env, AddRegionPeerState s
     return Flow.HAS_MORE_STATE;
   }
 
+  private void rollback(ConfigNodeProcedureEnv env, RegionMaintainHandler 
handler) {
+    List<TDataNodeLocation> correctDataNodeLocations =
+        
env.getConfigManager().getPartitionManager().getAllReplicaSets().stream()
+            .filter(tRegionReplicaSet -> 
tRegionReplicaSet.getRegionId().equals(consensusGroupId))
+            .findAny()
+            .get()
+            .getDataNodeLocations();
+
+    String correctStr =
+        correctDataNodeLocations.stream()
+            .map(TDataNodeLocation::getDataNodeId)
+            .collect(Collectors.toList())
+            .toString();
+    List<TDataNodeLocation> relatedDataNodeLocations = new 
ArrayList<>(correctDataNodeLocations);
+    relatedDataNodeLocations.add(destDataNode);
+    LOGGER.info(
+        "Will reset peer list of consensus group {} on DataNode {}",
+        consensusGroupId,
+        relatedDataNodeLocations.stream()
+            .map(TDataNodeLocation::getDataNodeId)
+            .collect(Collectors.toList()));
+    relatedDataNodeLocations
+        .parallelStream()

Review Comment:
   try not to use forkjoin pool



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java:
##########
@@ -457,55 +454,30 @@ private void sendRegisterRequestToConfigNode() throws 
StartupException, IOExcept
     }
   }
 
-  // TODO: Implement in IConsensus, not in DataNode
-  private List<ConsensusGroupId> getConsensusGroupId() {
-    List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
-    String dataRegionConsensusDir = config.getDataRegionConsensusDir();
-    if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
-      return consensusGroupIds;
-    }
-    try (DirectoryStream<Path> stream =
-        Files.newDirectoryStream(new File(dataRegionConsensusDir).toPath())) {
-      for (Path path : stream) {
-        String[] items = path.getFileName().toString().split("_");
-        ConsensusGroupId consensusGroupId =
-            ConsensusGroupId.Factory.create(Integer.parseInt(items[0]), 
Integer.parseInt(items[1]));
-        consensusGroupIds.add(consensusGroupId);
-      }
-    } catch (IOException e) {
-      logger.error("Cannot get consensus group id from {}", 
dataRegionConsensusDir, e);
-    }
-    return consensusGroupIds;
-  }
-
-  // TODO: remove for current version, add todo for rename
-  private void renameInvalidRegionDirs(List<ConsensusGroupId> 
invalidConsensusGroupIds) {
-    for (ConsensusGroupId consensusGroupId : invalidConsensusGroupIds) {
-      File oldDir =
-          new File(
-              IoTConsensus.buildPeerDir(
-                  new File(config.getDataRegionConsensusDir()), 
consensusGroupId));
-      File newDir =
-          new File(
-              IoTConsensus.buildPeerDir(
-                  new File(config.getInvalidDataRegionConsensusDir()), 
consensusGroupId));
-      if (oldDir.exists() && !FileUtils.moveFileSafe(oldDir, newDir)) {
-        logger.error("move {} to {} failed.", oldDir.getAbsolutePath(), 
newDir.getAbsolutePath());
-        try {
-          FileUtils.recursivelyDeleteFolder(oldDir.getPath());
-        } catch (IOException e) {
-          logger.error("delete {} failed.", oldDir.getAbsolutePath());
-        }
-      }
-    }
-  }
-
   private void removeInvalidRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
     List<ConsensusGroupId> invalidConsensusGroupIds =
-        getConsensusGroupId().stream()
+        
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsFromDisk().stream()
             .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
             .collect(Collectors.toList());
-    renameInvalidRegionDirs(invalidConsensusGroupIds);
+    if (!invalidConsensusGroupIds.isEmpty()) {
+      logger.info("Remove invalid region directories... {}", 
invalidConsensusGroupIds);
+      for (ConsensusGroupId consensusGroupId : invalidConsensusGroupIds) {
+        File oldDir =
+            new File(

Review Comment:
   consider ratisconsensus and simpleconsensus? maybe add another function to 
IConsensus?



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java:
##########
@@ -687,6 +687,12 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {
     return ids;
   }
 
+  @Override
+  public List<ConsensusGroupId> getAllConsensusGroupIdsFromDisk() {
+    logger.warn("RatisConsensus does not support 
getAllConsensusGroupIdsFromDisk");
+    return Collections.emptyList();

Review Comment:
   ?



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java:
##########
@@ -207,4 +207,11 @@ public interface IConsensus {
    * @return consensusGroupId list
    */
   List<ConsensusGroupId> getAllConsensusGroupIds();
+
+  /**
+   * Return all consensus group ids from disk.
+   *
+   * @return consensusGroupId list
+   */
+  List<ConsensusGroupId> getAllConsensusGroupIdsFromDisk();

Review Comment:
   getAllConsensusGroupIdsWithoutStarting()



##########
iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift:
##########
@@ -118,6 +118,7 @@ struct TCleanupTransferredSnapshotRes {
 service IoTConsensusIService {
   TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req)
   TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
+  TInactivatePeerRes inactivatePeerForDeletionPurpose(TInactivatePeerReq req)

Review Comment:
   why add new function?



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -591,23 +614,12 @@ public void removeSyncLogChannel(Peer targetPeer) throws 
ConsensusGroupModifyPee
     }
   }
 
-  // TODO: persist first and then delete old configuration file
   public void persistConfiguration() {
     try {
-      try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
-        stream
-            .filter(Files::isRegularFile)
-            .filter(filePath -> 
filePath.getFileName().toString().contains("configuration"))
-            .forEach(
-                filePath -> {
-                  try {
-                    Files.delete(filePath);
-                  } catch (IOException e) {
-                    logger.error("Unexpected error occurs when deleting old 
configuration file", e);
-                  }
-                });
-      }
-      serializeConfigurationAndFsyncToDisk();
+      serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
+      deleteConfiguration(CONFIGURATION_FILE_NAME);
+      serializeConfigurationAndFsyncToDisk(CONFIGURATION_FILE_NAME);

Review Comment:
   same



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java:
##########
@@ -207,4 +207,11 @@ public interface IConsensus {
    * @return consensusGroupId list
    */
   List<ConsensusGroupId> getAllConsensusGroupIds();
+
+  /**
+   * Return all consensus group ids from disk.

Review Comment:
   Add some comments just like
   We need to parse all the RegionGroupids from the disk directory before 
starting the consensus layer, so getAllConsensusGroupIds returns an empty list, 
so we need to add a new interface



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java:
##########
@@ -79,39 +82,31 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, 
AddRegionPeerState s
       outerSwitch:
       switch (state) {
         case CREATE_NEW_REGION_PEER:
-          handler.createNewRegionPeer(consensusGroupId, destDataNode);
+          TSStatus status = handler.createNewRegionPeer(consensusGroupId, 
destDataNode);
           setKillPoint(state);
+          if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
+            rollback(env, handler);
+          }
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
         case DO_ADD_REGION_PEER:
-          TSStatus tsStatus =
-              handler.addRegionPeer(this.getProcId(), destDataNode, 
consensusGroupId, coordinator);
-          setKillPoint(state);
-          TRegionMigrateResult result;
-          if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            result = handler.waitTaskFinish(this.getProcId(), coordinator);
-          } else {
-            throw new ProcedureException("ADD_REGION_PEER executed failed in 
DataNode");
+          if (!this.isStateDeserialized()) {
+            TSStatus tsStatus =
+                handler.submitAddRegionPeerTask(
+                    this.getProcId(), destDataNode, consensusGroupId, 
coordinator);
+            setKillPoint(state);
+            if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
+              throw new ProcedureException("ADD_REGION_PEER executed failed in 
DataNode");
+            }
           }
+          TRegionMigrateResult result = 
handler.waitTaskFinish(this.getProcId(), coordinator);
           switch (result.getTaskStatus()) {
             case TASK_NOT_EXIST:
               // coordinator crashed and lost its task table
             case FAIL:
               // maybe some DataNode crash
-              LOGGER.warn(
-                  "result is {}, will use resetPeerList to clean in the 
future",
-                  result.getTaskStatus());
-              //              List<TDataNodeLocation> correctDataNodeLocations 
=
-              //
-              // 
env.getConfigManager().getPartitionManager().getAllReplicaSets().stream()
-              //                      .filter(
-              //                          tRegionReplicaSet ->
-              //
-              // tRegionReplicaSet.getRegionId().equals(consensusGroupId))
-              //                      .findAny()
-              //                      .get()
-              //                      .getDataNodeLocations();
-              //              handler.resetPeerList(consensusGroupId, 
correctDataNodeLocations);
+              LOGGER.warn("result is {}, resetPeerList", 
result.getTaskStatus());

Review Comment:
   enhance description



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java:
##########
@@ -287,15 +287,16 @@ public DataSet read(IConsensusRequest request) {
 
   @Override
   public File getSnapshotRoot() {
-    String snapshotDir =
-        
IoTDBDescriptor.getInstance().getConfig().getRatisDataRegionSnapshotDir()
-            + File.separator
-            + region.getDatabaseName()
-            + "-"
-            + region.getDataRegionId();
+    String snapshotDir = "null";

Review Comment:
   default is ""?  or just use null



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java:
##########
@@ -287,15 +287,16 @@ public DataSet read(IConsensusRequest request) {
 
   @Override
   public File getSnapshotRoot() {
-    String snapshotDir =
-        
IoTDBDescriptor.getInstance().getConfig().getRatisDataRegionSnapshotDir()
-            + File.separator
-            + region.getDatabaseName()
-            + "-"
-            + region.getDataRegionId();
+    String snapshotDir = "null";
     try {
+      snapshotDir =
+          
IoTDBDescriptor.getInstance().getConfig().getRatisDataRegionSnapshotDir()
+              + File.separator
+              + region.getDatabaseName()
+              + "-"
+              + region.getDataRegionId();
       return new File(snapshotDir).getCanonicalFile();
-    } catch (IOException e) {
+    } catch (IOException | NullPointerException e) {

Review Comment:
   why catch NullPointerException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to