HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839349780



##########
File path: 
server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.recover.file;
+
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.exception.WALRecoverException;
+import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+/**
+ * This class is used to help recover all unsealed TsFiles at zero level. 
There are 3 main
+ * procedures: start recovery, redo logs, and end recovery, you must call them 
in order. Notice:
+ * This class doesn't guarantee concurrency safety.
+ */
+public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerformer {
+  private static final Logger logger =
+      LoggerFactory.getLogger(UnsealedTsFileRecoverPerformer.class);
+
+  /** sequence file or not */
+  private final boolean sequence;
+  /** add recovered TsFile back to virtual storage group */
+  private final Consumer<UnsealedTsFileRecoverPerformer> 
callbackAfterUnsealedTsFileRecovered;
+  /** redo wal log to recover TsFile */
+  private final TsFilePlanRedoer walRedoer;
+  /** trace result of this recovery */
+  private final WALRecoverListener recoverListener;
+
+  public UnsealedTsFileRecoverPerformer(
+      TsFileResource tsFileResource,
+      boolean sequence,
+      VirtualStorageGroupProcessor vsgProcessor,
+      Consumer<UnsealedTsFileRecoverPerformer> 
callbackAfterUnsealedTsFileRecovered) {
+    super(tsFileResource);
+    this.sequence = sequence;
+    this.callbackAfterUnsealedTsFileRecovered = 
callbackAfterUnsealedTsFileRecovered;
+    this.walRedoer = new TsFilePlanRedoer(tsFileResource, sequence, 
vsgProcessor);
+    this.recoverListener = new 
WALRecoverListener(tsFileResource.getTsFilePath());
+  }
+
+  /**
+   * Make preparation for recovery, including load .resource file (reconstruct 
when necessary) and
+   * truncate the file to remaining corrected data.
+   */
+  public void startRecovery() throws StorageGroupProcessorException, 
IOException {
+    super.recoverWithWriter();
+
+    if (hasCrashed()) {
+      // tsfile has crashed due to failure,
+      // the last ChunkGroup may contain the same data as the WALs,
+      // so the time map must be updated first to avoid duplicated insertion
+      loadResourceFromWriter();
+    }
+  }
+
+  private void loadResourceFromWriter() {

Review comment:
       Fixed.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
##########
@@ -34,136 +31,69 @@
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.writelog.io.ILogReader;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 
 /**
- * LogReplayer finds the logNode of the TsFile given by insertFilePath and 
logNodePrefix, reads the
- * WALs from the logNode and redoes them into a given MemTable and 
ModificationFile.
+ * This class helps redo wal logs into a TsFile. Notice: You should update 
time map in {@link
+ * TsFileResource} before using this class to avoid duplicated insertion and 
this class doesn't
+ * guarantee concurrency safety.
  */
-public class LogReplayer {
-
-  private Logger logger = LoggerFactory.getLogger(LogReplayer.class);
-  private String logNodePrefix;
-  private String insertFilePath;
-  private ModificationFile modFile;
-  private TsFileResource currentTsFileResource;
-  private IMemTable recoverMemTable;
-
-  // only unsequence file tolerates duplicated data
-  private boolean sequence;
-
-  private Map<String, Long> tempStartTimeMap = new HashMap<>();
-  private Map<String, Long> tempEndTimeMap = new HashMap<>();
-
-  public LogReplayer(
-      String logNodePrefix,
-      String insertFilePath,
-      ModificationFile modFile,
-      TsFileResource currentTsFileResource,
-      IMemTable memTable,
-      boolean sequence) {
-    this.logNodePrefix = logNodePrefix;
-    this.insertFilePath = insertFilePath;
-    this.modFile = modFile;
-    this.currentTsFileResource = currentTsFileResource;
-    this.recoverMemTable = memTable;
+public class TsFilePlanRedoer {
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFilePlanRedoer.class);
+
+  private final TsFileResource tsFileResource;
+  /** only unsequence file tolerates duplicated data */
+  private final boolean sequence;
+  /** this TsFile's virtual storage group */
+  private final VirtualStorageGroupProcessor vsgProcessor;

Review comment:
       Fixed.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -455,7 +332,7 @@ public void setReady(boolean ready) {
   }
 
   /** this class is used to store recovering context */
-  private class RecoveryContext {
+  private class VSGRecoveryContext {
     /** number of files to be recovered */
     private final long filesToRecoverNum;

Review comment:
       Fixed.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -467,7 +344,7 @@ public void setReady(boolean ready) {
     /** last recovery log files num */
     private long lastLogCheckFilesNum;
 
-    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+    public VSGRecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {

Review comment:
       Fixed.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -467,7 +344,7 @@ public void setReady(boolean ready) {
     /** last recovery log files num */
     private long lastLogCheckFilesNum;
 
-    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+    public VSGRecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
       this.filesToRecoverNum = filesToRecoverNum;
       this.recoveredFilesNum = recoveredFilesNum;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to