THUMarkLau commented on a change in pull request #5316:
URL: https://github.com/apache/iotdb/pull/5316#discussion_r834020060



##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,483 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
 public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
+  private final String fullStorageGroupName;
+  private final TsFileManager tsFileManager;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      TsFileManager tsFileManager,
+      File logFile,
+      boolean isInnerSpace) {
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
+    this.fullStorageGroupName = logicalStorageGroupName + "-" + 
virtualStorageGroupName;
     this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  public void doCompaction() {
+    boolean handleSuccess = true;

Review comment:
       better to rename it to `recoverSuccess`

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java
##########
@@ -59,22 +59,16 @@ public AbstractCrossSpaceCompactionTask getCompactionTask(
     }
   }
 
-  public AbstractCrossSpaceCompactionTask getCompactionRecoverTask(
+  public CompactionRecoverTask getCompactionRecoverTask(
       String logicalStorageGroupName,
       String virtualStorageGroupName,
-      long timePartitionId,
       File logFile,
       TsFileManager tsFileManager) {
     switch (this) {
       case REWRITE_COMPACTION:
       default:
-        return new RewriteCrossCompactionRecoverTask(

Review comment:
       Since both inner space and cross space compaction recovery is executed 
in `CompactionRecoverTask`, why not just new a instance of 
`CompactionRecoverTask` in the recover process rather than use a factory method

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,483 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
 public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
+  private final String fullStorageGroupName;
+  private final TsFileManager tsFileManager;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      TsFileManager tsFileManager,
+      File logFile,
+      boolean isInnerSpace) {
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
+    this.fullStorageGroupName = logicalStorageGroupName + "-" + 
virtualStorageGroupName;
     this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isInnerCompactionLogBefore013()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isCrossCompactionLogBefore013()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistBefore013(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(targetFileIdentifiers, 
sourceFileIdentifiers);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostBefore013(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithSomeSourceFilesLost(targetFileIdentifiers, 
sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(
+      List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> 
sourceFileIdentifiers)
+      throws IOException {
+    // some source files have been deleted, while target file must exist and 
complete.
+    if (!checkIsTargetFilesComplete(targetFileIdentifiers)) {
+      return false;
+    }
+
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
         }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
+        }
+      }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
+    }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  private boolean checkIsTargetFilesComplete(List<TsFileIdentifier> 
targetFileIdentifiers)
+      throws IOException {
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      if (targetFile == null
+          || !TsFileUtils.isTsFileComplete(new 
TsFileResource(targetFile).getTsFile())) {
+        LOGGER.error(
+            "{} [Compaction][ExceptionHandler] target file {} is not complete, 
and some source files is lost, do nothing. Set allowCompaction to false",
+            fullStorageGroupName,
+            targetFileIdentifier.getFilePath());
+        IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+        return false;
       }
     }
+    return true;
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {
+
+    /** Return whether cross compaction log file is from previous version 
(<0.13). */
+    private boolean isCrossCompactionLogBefore013() {
+      return compactionLogFile
+          .getName()
+          .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
+    }
+
+    /** Return whether inner compaction log file is from previous version 
(<0.13). */
+    private boolean isInnerCompactionLogBefore013() {
+      return 
compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName());
+    }
+
+    /** Delete tmp target file and compaction mods file. */
+    private boolean handleCrossCompactionWithAllSourceFilesExistBefore013(
+        List<TsFileIdentifier> targetFileIdentifiers) {
+      // delete tmp target file
+      for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+        // xxx.tsfile.merge
+        File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+        if (tmpTargetFile != null) {
+          tmpTargetFile.delete();
+        }
+      }
+      File compactionModsFileFromOld =
+          new File(
+              tsFileManager.getStorageGroupDir()
+                  + File.separator
+                  + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
+      if (compactionModsFileFromOld.exists() && 
!compactionModsFileFromOld.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModsFileFromOld);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * 1. If target file does not exist, then move .merge file to target file 
<br>
+     * 2. If target resource file does not exist, then serialize it. <br>
+     * 3. Append merging modification to target mods file and delete merging 
mods file. <br>
+     * 4. Delete source files and .merge file. <br>
+     */
+    private boolean handleCrossCompactionWithSomeSourceFilesLostBefore013(
+        List<TsFileIdentifier> targetFileIdentifiers,
+        List<TsFileIdentifier> sourceFileIdentifiers) {
+      try {
+        File compactionModsFileFromOld =
+            new File(
+                tsFileManager.getStorageGroupDir()
+                    + File.separator
+                    + 
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
+        List<TsFileResource> targetFileResources = new ArrayList<>();
+        for (int i = 0; i < sourceFileIdentifiers.size(); i++) {
+          TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i);
+          if (sourceFileIdentifier.isSequence()) {
+            File tmpTargetFile = 
targetFileIdentifiers.get(i).getFileFromDataDirs();
+            File targetFile = null;
+
+            // move tmp target file to target file if not exist
+            if (tmpTargetFile != null) {
+              // move tmp target file to target file
+              String sourceFilePath =
+                  tmpTargetFile
+                      .getPath()
+                      .replace(
+                          TsFileConstant.TSFILE_SUFFIX
+                              + 
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
+                          TsFileConstant.TSFILE_SUFFIX);
+              targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new 
File(sourceFilePath));
+              FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, 
targetFile);
+            } else {
+              // target file must exist
+              File file =
+                  TsFileNameGenerator.increaseCrossCompactionCnt(
+                      new File(
+                          targetFileIdentifiers
+                              .get(i)
+                              .getFilePath()
+                              .replace(
+                                  TsFileConstant.TSFILE_SUFFIX
+                                      + 
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
+                                  TsFileConstant.TSFILE_SUFFIX)));
+
+              targetFile = getFileFromDataDirs(file.getPath());
+            }
+            if (targetFile == null) {
+              LOGGER.error(
+                  "{} [Compaction][Recover] target file of source seq file {} 
does not exist (<0.13).",
+                  fullStorageGroupName,
+                  sourceFileIdentifier.getFilePath());
+              return false;
+            }
+
+            // serialize target resource file if not exist
+            TsFileResource targetResource = new TsFileResource(targetFile);
+            if (!targetResource.resourceFileExists()) {
+              try (TsFileSequenceReader reader =
+                  new TsFileSequenceReader(targetFile.getAbsolutePath())) {
+                FileLoaderUtils.updateTsFileResource(reader, targetResource);
+              }
+              targetResource.serialize();
+            }
+
+            targetFileResources.add(targetResource);
+
+            // append compaction modifications to target mods file and delete 
compaction mods file
+            if (compactionModsFileFromOld.exists()) {
+              ModificationFile compactionModsFile =
+                  new ModificationFile(compactionModsFileFromOld.getPath());
+              appendCompactionModificationsBefore013(targetResource, 
compactionModsFile);
+            }
+
+            // delete tmp target file
+            if (tmpTargetFile != null) {
+              tmpTargetFile.delete();

Review comment:
       check the return value of all `delete` function, or use 
`FileUtils.delete` which throws exception if the deletion is failed. Here we 
can simply new a `TsFileResource` instance, and use the `remove` method to 
delete file, resourace file and modification.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverManager.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.task;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import 
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static 
org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD;
+
+/**
+ * CompactionRecoverManager searches compaction log and call {@link 
CompactionRecoverTask} to
+ * execute the recover process for all compaction task sequentially, including 
InnerCompactionTask
+ * in sequence/unsequence space, CrossSpaceCompaction.
+ */
+public class CompactionRecoverManager {
+  private static final Logger logger =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final TsFileManager tsFileManager;
+  private final String logicalStorageGroupName;
+  private final String virtualStorageGroupId;
+
+  public CompactionRecoverManager(
+      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
+    this.tsFileManager = tsFileManager;
+    this.logicalStorageGroupName = logicalStorageGroupName;
+    this.virtualStorageGroupId = virtualStorageGroupId;
+  }
+
+  public void recoverInnerSpaceCompaction(boolean isSequence) {
+    // search compaction log for SizeTieredCompaction
+    List<String> dirs;
+    if (isSequence) {
+      dirs = DirectoryManager.getInstance().getAllSequenceFileFolders();
+    } else {
+      dirs = DirectoryManager.getInstance().getAllUnSequenceFileFolders();
+    }
+    for (String dir : dirs) {
+      File storageGroupDir =
+          new File(
+              dir
+                  + File.separator
+                  + logicalStorageGroupName
+                  + File.separator
+                  + virtualStorageGroupId);
+      if (!storageGroupDir.exists()) {
+        return;
+      }
+      File[] timePartitionDirs = storageGroupDir.listFiles();
+      if (timePartitionDirs == null) {
+        return;
+      }
+      for (File timePartitionDir : timePartitionDirs) {
+        if (!timePartitionDir.isDirectory()
+            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
+          continue;
+        }
+        File[] compactionLogs =
+            
InnerSpaceCompactionUtils.findInnerSpaceCompactionLogs(timePartitionDir.getPath());
+        for (File compactionLog : compactionLogs) {
+          IoTDBDescriptor.getInstance()
+              .getConfig()
+              .getInnerCompactionStrategy()
+              .getCompactionRecoverTask(
+                  tsFileManager.getStorageGroupName(),
+                  tsFileManager.getVirtualStorageGroup(),
+                  compactionLog,
+                  tsFileManager)
+              .doCompaction();
+        }
+      }
+    }
+
+    // search compaction log for old LevelCompaction
+    File logFile =
+        FSFactoryProducer.getFSFactory()
+            .getFile(
+                tsFileManager.getStorageGroupDir(),
+                logicalStorageGroupName + 
INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD);
+    if (logFile.exists()) {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .getInnerCompactionStrategy()
+          .getCompactionRecoverTask(
+              tsFileManager.getStorageGroupName(),
+              tsFileManager.getVirtualStorageGroup(),
+              logFile,
+              tsFileManager)
+          .doCompaction();
+    }
+  }
+
+  public void recoverCrossSpaceCompaction() {
+    logger.info("recovering cross compaction");
+    recoverCrossCompactionBefore013();

Review comment:
       Please unify the styles of inner-space recovery and cross-space recovery

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionExceptionHandler.java
##########
@@ -182,38 +194,40 @@ private static boolean handleWhenAllSourceFilesExist(
   }
 
   /**
-   * Some source files are lost, check if the compaction has finished. If the 
compaction has
-   * finished, delete the remaining source files and compaction mods files. If 
the compaction has
-   * not finished, set the allowCompaction in tsFileManager to false and print 
some error logs.
+   * Some source files are lost, check if all target files are complete. If 
all target files are
+   * complete, delete the remaining source files and compaction mods files. If 
some target files are
+   * not complete, set the allowCompaction in tsFileManager to false and print 
some error logs.
    */
-  public static boolean handleWhenSomeSourceFilesLost(
-      String storageGroup,
-      List<TsFileResource> seqFileList,
-      List<TsFileResource> unseqFileList,
-      List<TsFileResource> targetFileList,
-      List<TsFileResource> lostSourceFiles)
+  private static boolean handleWhenSomeSourceFilesLost(
+      List<TsFileResource> targetResourceList,
+      List<TsFileResource> sourceSeqResourceList,
+      List<TsFileResource> sourceUnseqResourceList,
+      List<TsFileResource> lostSourceResourceList,
+      String fullStorageGroupName)
       throws IOException {
-    if (!checkIsTargetFilesComplete(targetFileList, lostSourceFiles, 
storageGroup)) {
+    // check whether is all target files complete
+    if (!checkIsTargetFilesComplete(
+        targetResourceList, lostSourceResourceList, fullStorageGroupName)) {
       return false;
     }
 
     // delete source files
-    for (TsFileResource unseqFile : unseqFileList) {
-      unseqFile.remove();
-      unseqFile.setStatus(TsFileResourceStatus.DELETED);
+    for (TsFileResource resource : sourceSeqResourceList) {
+      resource.setStatus(TsFileResourceStatus.DELETED);
+      resource.remove();

Review comment:
       remove the tsfile resource from the `TsFileResourceManager`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to