qiaojialin commented on a change in pull request #4498:
URL: https://github.com/apache/iotdb/pull/4498#discussion_r767238869
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
##########
@@ -110,59 +79,100 @@ protected void doCompaction() throws Exception {
.getName();
TsFileResource targetTsFileResource =
new TsFileResource(new File(dataDirectory + File.separator +
targetFileName));
- LOGGER.info(
+ LOGGER.warn(
"{} [Compaction] starting compaction task with {} files",
fullStorageGroupName,
selectedTsFileResourceList.size());
File logFile = null;
+ SizeTieredCompactionLogger sizeTieredCompactionLogger = null;
+ // to mark if we got the write lock or read lock of the selected tsfile
+ boolean[] isHoldingReadLock = new
boolean[selectedTsFileResourceList.size()];
+ boolean[] isHoldingWriteLock = new
boolean[selectedTsFileResourceList.size()];
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ isHoldingReadLock[i] = false;
+ isHoldingWriteLock[i] = false;
+ }
+ LOGGER.warn(
+ "{} [Compaction] Try to get the read lock of all selected files",
fullStorageGroupName);
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ selectedTsFileResourceList.get(i).readLock();
+ isHoldingReadLock[i] = true;
+ }
+
try {
logFile =
new File(
dataDirectory
+ File.separator
+ targetFileName
+ SizeTieredCompactionLogger.COMPACTION_LOG_NAME);
- SizeTieredCompactionLogger sizeTieredCompactionLogger =
- new SizeTieredCompactionLogger(logFile.getPath());
+ sizeTieredCompactionLogger = new
SizeTieredCompactionLogger(logFile.getPath());
+
for (TsFileResource resource : selectedTsFileResourceList) {
sizeTieredCompactionLogger.logFileInfo(SOURCE_INFO,
resource.getTsFile());
}
sizeTieredCompactionLogger.logSequence(sequence);
sizeTieredCompactionLogger.logFileInfo(TARGET_INFO,
targetTsFileResource.getTsFile());
- LOGGER.info(
+ LOGGER.warn(
"{} [Compaction] compaction with {}", fullStorageGroupName,
selectedTsFileResourceList);
+
// carry out the compaction
InnerSpaceCompactionUtils.compact(
targetTsFileResource, selectedTsFileResourceList,
fullStorageGroupName, true);
- LOGGER.info(
+ LOGGER.warn(
"{} [SizeTiredCompactionTask] compact finish, close the logger",
fullStorageGroupName);
sizeTieredCompactionLogger.close();
- LOGGER.info(
+ LOGGER.warn(
"{} [Compaction] compaction finish, start to delete old files",
fullStorageGroupName);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException(
String.format("%s [Compaction] abort", fullStorageGroupName));
}
+ LOGGER.warn(
Review comment:
```suggestion
LOGGER.info(
```
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
##########
@@ -110,59 +79,100 @@ protected void doCompaction() throws Exception {
.getName();
TsFileResource targetTsFileResource =
new TsFileResource(new File(dataDirectory + File.separator +
targetFileName));
- LOGGER.info(
+ LOGGER.warn(
"{} [Compaction] starting compaction task with {} files",
fullStorageGroupName,
selectedTsFileResourceList.size());
File logFile = null;
+ SizeTieredCompactionLogger sizeTieredCompactionLogger = null;
+ // to mark if we got the write lock or read lock of the selected tsfile
+ boolean[] isHoldingReadLock = new
boolean[selectedTsFileResourceList.size()];
+ boolean[] isHoldingWriteLock = new
boolean[selectedTsFileResourceList.size()];
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ isHoldingReadLock[i] = false;
+ isHoldingWriteLock[i] = false;
+ }
+ LOGGER.warn(
+ "{} [Compaction] Try to get the read lock of all selected files",
fullStorageGroupName);
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ selectedTsFileResourceList.get(i).readLock();
+ isHoldingReadLock[i] = true;
+ }
+
try {
logFile =
new File(
dataDirectory
+ File.separator
+ targetFileName
+ SizeTieredCompactionLogger.COMPACTION_LOG_NAME);
- SizeTieredCompactionLogger sizeTieredCompactionLogger =
- new SizeTieredCompactionLogger(logFile.getPath());
+ sizeTieredCompactionLogger = new
SizeTieredCompactionLogger(logFile.getPath());
+
for (TsFileResource resource : selectedTsFileResourceList) {
sizeTieredCompactionLogger.logFileInfo(SOURCE_INFO,
resource.getTsFile());
}
sizeTieredCompactionLogger.logSequence(sequence);
sizeTieredCompactionLogger.logFileInfo(TARGET_INFO,
targetTsFileResource.getTsFile());
- LOGGER.info(
+ LOGGER.warn(
"{} [Compaction] compaction with {}", fullStorageGroupName,
selectedTsFileResourceList);
+
// carry out the compaction
InnerSpaceCompactionUtils.compact(
targetTsFileResource, selectedTsFileResourceList,
fullStorageGroupName, true);
- LOGGER.info(
+ LOGGER.warn(
"{} [SizeTiredCompactionTask] compact finish, close the logger",
fullStorageGroupName);
sizeTieredCompactionLogger.close();
- LOGGER.info(
+ LOGGER.warn(
"{} [Compaction] compaction finish, start to delete old files",
fullStorageGroupName);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException(
String.format("%s [Compaction] abort", fullStorageGroupName));
}
+ LOGGER.warn(
+ "{} [Compaction] Compacted target files, try to get the write lock
of source files",
+ fullStorageGroupName);
+
+ // release the read lock of all source files, and get the write lock of
them to delete them
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ selectedTsFileResourceList.get(i).readUnlock();
+ isHoldingReadLock[i] = false;
+ selectedTsFileResourceList.get(i).writeLock();
+ isHoldingWriteLock[i] = true;
+ }
+ LOGGER.warn(
Review comment:
```suggestion
LOGGER.info(
```
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class is used to handle exception (including OOM error) occurred
during compaction. The
+ * <i>allowCompaction</i> flag in {@link
org.apache.iotdb.db.engine.storagegroup.TsFileManager} may
+ * be set to false if exception cannot be handled correctly(such as OOM during
handling exception),
+ * after which the subsequent compaction in this storage group will not be
carried out. Under some
+ * serious circumstances(such as data lost), the system will be set to
read-only.
+ */
+public class InnerSpaceCompactionExceptionHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger("COMPACTION");
+
+ public static void handleException(
+ String fullStorageGroupName,
+ File logFile,
+ TsFileResource targetTsFile,
+ List<TsFileResource> selectedTsFileResourceList,
+ TsFileManager tsFileManager,
+ TsFileResourceList tsFileResourceList) {
+
+ if (!logFile.exists()) {
+ // log file does not exist
+ // it means the compaction has not started yet
+ // we need not to handle it
+ return;
+ }
+
+ boolean handleSuccess = true;
+
+ List<TsFileResource> lostSourceFiles = new ArrayList<>();
+ boolean allSourceFileExist =
+ checkAllSourceFilesExist(selectedTsFileResourceList, lostSourceFiles);
+
+ if (allSourceFileExist) {
+ handleSuccess =
+ handleWhenAllSourceFilesExist(
+ fullStorageGroupName, targetTsFile, selectedTsFileResourceList,
tsFileResourceList);
+ } else {
+ // some source file does not exists
+ // it means we start to delete source file
+ LOGGER.info(
+ "{} [Compaction][ExceptionHandler] some source files {} is lost",
+ fullStorageGroupName,
+ lostSourceFiles);
+ if (!targetTsFile.getTsFile().exists()) {
+ // some source files are missed, and target file not exists
+ // some data is lost, set the system to read-only
+ LOGGER.warn(
+ "{} [Compaction][ExceptionHandler] target file {} does not exist
either, do nothing. Set system to read-only",
+ fullStorageGroupName,
+ targetTsFile);
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ handleSuccess = false;
+ } else {
+ handleSuccess =
+ handleWhenSomeSourceFilesLost(
+ fullStorageGroupName,
+ targetTsFile,
+ selectedTsFileResourceList,
+ tsFileResourceList,
+ lostSourceFiles);
+ }
+ }
+
+ if (!handleSuccess) {
+ LOGGER.error(
+ "{} [Compaction][ExceptionHandler] Failed to handle exception, set
allowCompaction to false",
+ fullStorageGroupName);
+ tsFileManager.setAllowCompaction(false);
+ } else {
+ LOGGER.info(
+ "{} [Compaction][ExceptionHandler] Handle exception successfully,
delete log file {}",
+ fullStorageGroupName,
+ logFile);
+ try {
+ FileUtils.delete(logFile);
+ } catch (IOException e) {
+ LOGGER.error(
+ "{} [Compaction][ExceptionHandler] Exception occurs while deleting
log file {}, set allowCompaction to false",
+ fullStorageGroupName,
+ logFile,
+ e);
+ tsFileManager.setAllowCompaction(false);
+ }
+ }
+ }
+
+ private static boolean checkAllSourceFilesExist(
+ List<TsFileResource> sourceFiles, List<TsFileResource> lostSourceFiles) {
+ boolean allSourceFileExist = true;
+ for (TsFileResource sourceTsFile : sourceFiles) {
+ if (!sourceTsFile.getTsFile().exists()) {
+ allSourceFileExist = false;
+ lostSourceFiles.add(sourceTsFile);
+ }
+ }
+ return allSourceFileExist;
+ }
+
+ private static boolean handleWhenAllSourceFilesExist(
+ String fullStorageGroupName,
+ TsFileResource targetTsFile,
+ List<TsFileResource> selectedTsFileResourceList,
+ TsFileResourceList tsFileResourceList) {
+ // all source file exists, delete the target file
+ LOGGER.info(
+ "{} [Compaction][ExceptionHandler] all source files {} exists, delete
target file {}",
+ fullStorageGroupName,
+ selectedTsFileResourceList,
+ targetTsFile);
+ if (!targetTsFile.remove()) {
+ // failed to remove target tsfile
+ // system should not carry out the subsequent compaction in case of data
redundant
+ LOGGER.warn(
+ "{} [Compaction][ExceptionHandler] failed to remove target file {}",
+ fullStorageGroupName,
+ targetTsFile);
+ return false;
+ }
+ // deal with compaction modification
+ try {
+ for (TsFileResource sourceFile : selectedTsFileResourceList) {
+ if (sourceFile.getCompactionModFile().exists()) {
+ ModificationFile compactionModificationFile =
+ ModificationFile.getCompactionMods(sourceFile);
+ Collection<Modification> newModification =
compactionModificationFile.getModifications();
+ compactionModificationFile.close();
+ // write the modifications to a new modification file
+ sourceFile.resetModFile();
Review comment:
If mods exists, write to the existing mods file, not a new mods
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]