chengjianyun commented on a change in pull request #5316:
URL: https://github.com/apache/iotdb/pull/5316#discussion_r831943804



##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task 
sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction 
tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String 
virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, 
compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to 
recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
+        if (isInnerSpace && 
compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && 
compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || 
sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", 
fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, 
isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = 
handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set 
allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, 
delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log 
file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target 
files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target 
files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of 
data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || 
!Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new 
TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, 
Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction 
mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, 
resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> 
sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file 
{}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            
CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, 
so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this 
may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + 
ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may 
cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) 
otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous 
version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {

Review comment:
       Looks like the nested class is unnecessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to