qiaojialin commented on a change in pull request #4498: URL: https://github.com/apache/iotdb/pull/4498#discussion_r767156669
########## File path: server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.inner; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileManager; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This class is used to handle exception (including OOM error) occurred during compaction. The + * <i>allowCompaction</i> flag in {@link org.apache.iotdb.db.engine.storagegroup.TsFileManager} may + * be set to false if exception cannot be handled correctly(such as OOM during handling exception), + * after which the subsequent compaction in this storage group will not be carried out. Under some + * serious circumstances(such as data lost), the system will be set to read-only. + */ +public class InnerSpaceCompactionExceptionHandler { + private static final Logger LOGGER = LoggerFactory.getLogger("COMPACTION"); + + public static void handleException( + String fullStorageGroupName, + File logFile, + TsFileResource targetTsFile, + List<TsFileResource> selectedTsFileResourceList, + TsFileManager tsFileManager, + TsFileResourceList tsFileResourceList) { + + if (!logFile.exists()) { + // log file does not exist + // it means the compaction has not started yet + // we need not to handle it + return; + } + + boolean handleSuccess = true; + + List<TsFileResource> lostSourceFiles = new ArrayList<>(); + boolean allSourceFileExist = + checkAllSourceFilesExist(selectedTsFileResourceList, lostSourceFiles); + + if (allSourceFileExist) { + handleSuccess = + handleWhenAllSourceFilesExist( + fullStorageGroupName, targetTsFile, selectedTsFileResourceList, tsFileResourceList); + } else { + // some source file does not exists + // it means we start to delete source file + LOGGER.info( + "{} [Compaction][ExceptionHandler] some source files {} is lost", + fullStorageGroupName, + lostSourceFiles); + if (!targetTsFile.getTsFile().exists()) { + // some source files are missed, and target file not exists + // some data is lost, set the system to read-only + LOGGER.warn( + "{} [Compaction][ExceptionHandler] target file {} does not exist either, do nothing. Set system to read-only", + fullStorageGroupName, + targetTsFile); + IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); + handleSuccess = false; + } else { + handleSuccess = + handleWhenSomeSourceFilesLost( + fullStorageGroupName, + targetTsFile, + selectedTsFileResourceList, + tsFileResourceList, + lostSourceFiles); + } + } + + if (handleSuccess) { + LOGGER.error( + "{} [Compaction][ExceptionHandler] Failed to handle exception, set allowCompaction to false", + fullStorageGroupName); + tsFileManager.setAllowCompaction(false); + } else { + LOGGER.info( + "{} [Compaction][ExceptionHandler] Handle exception successfully, delete log file {}", + fullStorageGroupName, + logFile); + try { + FileUtils.delete(logFile); + } catch (IOException e) { + LOGGER.error( + "{} [Compaction][ExceptionHandler] Exception occurs while deleting log file {}, set allowCompaction to false", + fullStorageGroupName, + logFile, + e); + tsFileManager.setAllowCompaction(false); + } + } + } + + private static boolean checkAllSourceFilesExist( + List<TsFileResource> sourceFiles, List<TsFileResource> lostSourceFiles) { + boolean allSourceFileExist = true; + for (TsFileResource sourceTsFile : sourceFiles) { + if (!sourceTsFile.getTsFile().exists()) { + allSourceFileExist = false; + lostSourceFiles.add(sourceTsFile); + } + } + return allSourceFileExist; + } + + private static boolean handleWhenAllSourceFilesExist( + String fullStorageGroupName, + TsFileResource targetTsFile, + List<TsFileResource> selectedTsFileResourceList, + TsFileResourceList tsFileResourceList) { + // all source file exists, delete the target file + LOGGER.info( + "{} [Compaction][ExceptionHandler] all source files {} exists, delete target file {}", + fullStorageGroupName, + selectedTsFileResourceList, + targetTsFile); + if (targetTsFile.remove()) { + tsFileResourceList.writeLock(); + try { + // add the source tsfile to TsFileResourceList + // in case of we have removed them from list before + for (TsFileResource tsFileResource : selectedTsFileResourceList) { + if (!tsFileResourceList.contains(tsFileResource)) { + tsFileResourceList.keepOrderInsert(tsFileResource); + } + } Review comment: In compaction, we could delete the source File in disk first, then remove it in List. Then this insertion is not needed. ########## File path: server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.inner; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileManager; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This class is used to handle exception (including OOM error) occurred during compaction. The + * <i>allowCompaction</i> flag in {@link org.apache.iotdb.db.engine.storagegroup.TsFileManager} may + * be set to false if exception cannot be handled correctly(such as OOM during handling exception), + * after which the subsequent compaction in this storage group will not be carried out. Under some + * serious circumstances(such as data lost), the system will be set to read-only. + */ +public class InnerSpaceCompactionExceptionHandler { + private static final Logger LOGGER = LoggerFactory.getLogger("COMPACTION"); + + public static void handleException( + String fullStorageGroupName, + File logFile, + TsFileResource targetTsFile, + List<TsFileResource> selectedTsFileResourceList, + TsFileManager tsFileManager, + TsFileResourceList tsFileResourceList) { + + if (!logFile.exists()) { + // log file does not exist + // it means the compaction has not started yet + // we need not to handle it + return; + } + + boolean handleSuccess = true; + + List<TsFileResource> lostSourceFiles = new ArrayList<>(); + boolean allSourceFileExist = + checkAllSourceFilesExist(selectedTsFileResourceList, lostSourceFiles); + + if (allSourceFileExist) { + handleSuccess = + handleWhenAllSourceFilesExist( + fullStorageGroupName, targetTsFile, selectedTsFileResourceList, tsFileResourceList); + } else { + // some source file does not exists + // it means we start to delete source file + LOGGER.info( + "{} [Compaction][ExceptionHandler] some source files {} is lost", + fullStorageGroupName, + lostSourceFiles); + if (!targetTsFile.getTsFile().exists()) { + // some source files are missed, and target file not exists + // some data is lost, set the system to read-only + LOGGER.warn( + "{} [Compaction][ExceptionHandler] target file {} does not exist either, do nothing. Set system to read-only", + fullStorageGroupName, + targetTsFile); + IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); + handleSuccess = false; + } else { + handleSuccess = + handleWhenSomeSourceFilesLost( + fullStorageGroupName, + targetTsFile, + selectedTsFileResourceList, + tsFileResourceList, + lostSourceFiles); + } + } + + if (handleSuccess) { Review comment: ```suggestion if (!handleSuccess) { ``` ########## File path: server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java ########## @@ -541,24 +542,64 @@ private static boolean isFileListHasModifications( public static void deleteTsFilesInDisk( Collection<TsFileResource> mergeTsFiles, String storageGroupName) { - logger.info("{} [compaction] merge starts to delete real file ", storageGroupName); + logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName); for (TsFileResource mergeTsFile : mergeTsFiles) { deleteTsFile(mergeTsFile); logger.info( "{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath()); } } + /** Delete all modification files for source files */ + public static void deleteModificationForSourceFile( + Collection<TsFileResource> sourceFiles, String storageGroupName) throws IOException { + logger.info("{} [Compaction] Start to delete modifications of source files", storageGroupName); + for (TsFileResource tsFileResource : sourceFiles) { + ModificationFile compactionModificationFile = + ModificationFile.getCompactionMods(tsFileResource); + if (compactionModificationFile.exists()) { + compactionModificationFile.remove(); + } + + ModificationFile normalModification = ModificationFile.getNormalMods(tsFileResource); + if (normalModification.exists()) { + normalModification.remove(); + } + } + } + + /** + * Collect all the compaction modification files of source files, and combines them as the + * modification file of target file. + */ + public static void combineModsInCompaction( + Collection<TsFileResource> mergeTsFiles, TsFileResource targetTsFile) throws IOException { + List<Modification> modifications = new ArrayList<>(); + for (TsFileResource mergeTsFile : mergeTsFiles) { + try (ModificationFile sourceCompactionModificationFile = + ModificationFile.getCompactionMods(mergeTsFile)) { + modifications.addAll(sourceCompactionModificationFile.getModifications()); + } + } + if (!modifications.isEmpty()) { + try (ModificationFile modificationFile = ModificationFile.getNormalMods(targetTsFile)) { + for (Modification modification : modifications) { + // we have to set modification offset to MAX_VALUE, as the offset of source chunk may + // change after compaction + modification.setFileOffset(Long.MAX_VALUE); + modificationFile.write(modification); + } + } + } + } + public static void deleteTsFile(TsFileResource seqFile) { Review comment: could this be replaced by TsFileResource.remove()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
