chengjianyun commented on a change in pull request #5316:
URL: https://github.com/apache/iotdb/pull/5316#discussion_r831944438



##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, 
isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = 
handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) 
otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {
+
+    /** Return whether cross compaction log file is from previous version 
(<0.13). */
+    private boolean isOldCrossCompactionLog() {

Review comment:
       How about rename to `isCrossCompactionLogBefore013` ?

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, 
isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = 
handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) 
otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {
+
+    /** Return whether cross compaction log file is from previous version 
(<0.13). */
+    private boolean isOldCrossCompactionLog() {
+      return compactionLogFile
+          .getName()
+          .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
+    }
+
+    /** Return whether inner compaction log file is from previous version 
(<0.13). */
+    private boolean isOldInnerCompactionLog() {
+      return 
compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName());
+    }
+
+    /** Delete tmp target file and compaction mods file. */
+    private boolean handleCrossCompactionWithAllSourceFilesExistFromOld(

Review comment:
       The method should specify which versions could be handle.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, 
isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = 
handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) 
otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {
+
+    /** Return whether cross compaction log file is from previous version 
(<0.13). */
+    private boolean isOldCrossCompactionLog() {
+      return compactionLogFile
+          .getName()
+          .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
+    }
+
+    /** Return whether inner compaction log file is from previous version 
(<0.13). */
+    private boolean isOldInnerCompactionLog() {
+      return 
compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName());
+    }
+
+    /** Delete tmp target file and compaction mods file. */
+    private boolean handleCrossCompactionWithAllSourceFilesExistFromOld(
+        List<TsFileIdentifier> targetFileIdentifiers) {
+      // delete tmp target file
+      for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+        // xxx.tsfile.merge
+        File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+        if (tmpTargetFile != null) {
+          tmpTargetFile.delete();
+        }
+      }
+      File compactionModsFileFromOld =
+          new File(
+              tsFileManager.getStorageGroupDir()
+                  + File.separator
+                  + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
+      if (compactionModsFileFromOld.exists() && 
!compactionModsFileFromOld.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModsFileFromOld);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * 1. If target file does not exist, then move .merge file to target file 
<br>
+     * 2. If target resource file does not exist, then serialize it. <br>
+     * 3. Append merging modification to target mods file and delete merging 
mods file. <br>
+     * 4. Delete source files and .merge file. <br>
+     */
+    private boolean handleCrossCompactionWithSomeSourceFilesLostFromOld(
+        List<TsFileIdentifier> targetFileIdentifiers,
+        List<TsFileIdentifier> sourceFileIdentifiers) {
+      try {
+        File compactionModsFileFromOld =
+            new File(
+                tsFileManager.getStorageGroupDir()
+                    + File.separator
+                    + 
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
+        List<TsFileResource> targetFileResources = new ArrayList<>();
+        for (int i = 0; i < sourceFileIdentifiers.size(); i++) {
+          TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i);
+          if (sourceFileIdentifier.isSequence()) {
+            File tmpTargetFile = 
targetFileIdentifiers.get(i).getFileFromDataDirs();
+            File targetFile = null;
+
+            // move tmp target file to target file if not exist
+            if (tmpTargetFile != null) {
+              // move tmp target file to target file
+              String sourceFilePath =
+                  tmpTargetFile
+                      .getPath()
+                      .replace(
+                          TsFileConstant.TSFILE_SUFFIX
+                              + 
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
+                          TsFileConstant.TSFILE_SUFFIX);
+              targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new 
File(sourceFilePath));
+              FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, 
targetFile);
+            } else {
+              // target file must exist
+              File file =
+                  TsFileNameGenerator.increaseCrossCompactionCnt(
+                      new File(
+                          targetFileIdentifiers
+                              .get(i)
+                              .getFilePath()
+                              .replace(
+                                  TsFileConstant.TSFILE_SUFFIX
+                                      + 
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
+                                  TsFileConstant.TSFILE_SUFFIX)));
+
+              targetFile = getFileFromDataDirs(file.getPath());
+            }
+            if (targetFile == null) {
+              LOGGER.error(
+                  "{} [Compaction][Recover] target file of source seq file {} 
does not exist (<0.13).",
+                  fullStorageGroupName,
+                  sourceFileIdentifier.getFilePath());
+              return false;
+            }
+
+            // serialize target resource file if not exist
+            TsFileResource targetResource = new TsFileResource(targetFile);
+            if (!targetResource.resourceFileExists()) {
+              try (TsFileSequenceReader reader =
+                  new TsFileSequenceReader(targetFile.getAbsolutePath())) {
+                FileLoaderUtils.updateTsFileResource(reader, targetResource);
+              }
+              targetResource.serialize();
+            }
+
+            targetFileResources.add(targetResource);
+
+            // append compaction modifications to target mods file and delete 
compaction mods file
+            if (compactionModsFileFromOld.exists()) {
+              ModificationFile compactionModsFile =
+                  new ModificationFile(compactionModsFileFromOld.getPath());
+              appendCompactionModificationsFromOld(targetResource, 
compactionModsFile);
+            }
+
+            // delete tmp target file
+            if (tmpTargetFile != null) {
+              tmpTargetFile.delete();
+            }
+          }
+
+          // delete source tsfile
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile != null) {
+            sourceFile.delete();
+          }
+
+          // delete source resource file
+          sourceFile =
+              getFileFromDataDirs(
+                  sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+          if (sourceFile != null) {
+            sourceFile.delete();
+          }
+
+          // delete source mods file
+          sourceFile =
+              getFileFromDataDirs(
+                  sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+          if (sourceFile != null) {
+            sourceFile.delete();
+          }
+        }
+
+        // delete compaction mods file
+        if (compactionModsFileFromOld.exists() && 
!compactionModsFileFromOld.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              compactionModsFileFromOld);
+          return false;
+        }
+      } catch (Throwable e) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to handle with some source files 
lost from old version.",
+            fullStorageGroupName,
+            e);
+        return false;
+      }
+
+      return true;
+    }
+
+    private void appendCompactionModificationsFromOld(

Review comment:
       Same as above. Old is not specific.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, 
isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = 
handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) 
otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {
+
+    /** Return whether cross compaction log file is from previous version 
(<0.13). */
+    private boolean isOldCrossCompactionLog() {
+      return compactionLogFile
+          .getName()
+          .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
+    }
+
+    /** Return whether inner compaction log file is from previous version 
(<0.13). */
+    private boolean isOldInnerCompactionLog() {
+      return 
compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName());
+    }
+
+    /** Delete tmp target file and compaction mods file. */
+    private boolean handleCrossCompactionWithAllSourceFilesExistFromOld(
+        List<TsFileIdentifier> targetFileIdentifiers) {
+      // delete tmp target file
+      for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+        // xxx.tsfile.merge
+        File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+        if (tmpTargetFile != null) {
+          tmpTargetFile.delete();
+        }
+      }
+      File compactionModsFileFromOld =
+          new File(
+              tsFileManager.getStorageGroupDir()
+                  + File.separator
+                  + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
+      if (compactionModsFileFromOld.exists() && 
!compactionModsFileFromOld.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModsFileFromOld);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * 1. If target file does not exist, then move .merge file to target file 
<br>
+     * 2. If target resource file does not exist, then serialize it. <br>
+     * 3. Append merging modification to target mods file and delete merging 
mods file. <br>
+     * 4. Delete source files and .merge file. <br>
+     */
+    private boolean handleCrossCompactionWithSomeSourceFilesLostFromOld(

Review comment:
       Same as above.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, 
isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = 
handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) 
otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {
+
+    /** Return whether cross compaction log file is from previous version 
(<0.13). */
+    private boolean isOldCrossCompactionLog() {
+      return compactionLogFile
+          .getName()
+          .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
+    }
+
+    /** Return whether inner compaction log file is from previous version 
(<0.13). */
+    private boolean isOldInnerCompactionLog() {

Review comment:
       How about rename to `isInnerCompactionLogBefore013`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to