HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839357502



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.wal.buffer.IWALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.TsFilePathUtils;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. 
*/
+public class WALNode implements IWALNode {
+  public static final Pattern WAL_NODE_FOLDER_PATTERN = 
Pattern.compile("(?<nodeIdentifier>\\d+)");
+
+  private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final long MAX_STORAGE_SPACE_IN_BYTE =
+      config.getWalNodeMaxStorageSpaceInMb() * 1024 * 1024;
+  private static final long MEM_TABLE_SNAPSHOT_THRESHOLD_IN_BYTE =
+      config.getWalMemTableSnapshotThreshold();
+
+  /** unique identifier of this WALNode */
+  private final String identifier;
+  /** directory to store this node's files */
+  private final String logDirectory;
+  /** wal buffer */
+  private final IWALBuffer buffer;
+  /** manage checkpoints */
+  private final CheckpointManager checkpointManager;
+
+  public WALNode(String identifier, String logDirectory) throws 
FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal node-{}.", logDirectory, 
identifier);
+    }
+    this.buffer = new WALBuffer(identifier, logDirectory);
+    this.checkpointManager = new CheckpointManager(identifier, logDirectory);
+  }
+
+  /** Return true when this folder wal node folder */
+  public static boolean walNodeFolderNameFilter(File dir, String name) {
+    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, InsertPlan insertPlan) {
+    WALEdit walEdit = new WALEdit(memTableId, insertPlan);
+    return log(walEdit);
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, DeletePlan deletePlan) {
+    WALEdit walEdit = new WALEdit(memTableId, deletePlan);
+    return log(walEdit);
+  }
+
+  private WALFlushListener log(WALEdit walEdit) {
+    buffer.write(walEdit);
+    return walEdit.getWalFlushListener();
+  }
+
+  @Override
+  public void onFlushStart(IMemTable memTable) {
+    // do nothing
+  }
+
+  @Override
+  public void onFlushEnd(IMemTable memTable) {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.wal.buffer.IWALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.TsFilePathUtils;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. 
*/
+public class WALNode implements IWALNode {
+  public static final Pattern WAL_NODE_FOLDER_PATTERN = 
Pattern.compile("(?<nodeIdentifier>\\d+)");
+
+  private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final long MAX_STORAGE_SPACE_IN_BYTE =
+      config.getWalNodeMaxStorageSpaceInMb() * 1024 * 1024;
+  private static final long MEM_TABLE_SNAPSHOT_THRESHOLD_IN_BYTE =
+      config.getWalMemTableSnapshotThreshold();
+
+  /** unique identifier of this WALNode */
+  private final String identifier;
+  /** directory to store this node's files */
+  private final String logDirectory;
+  /** wal buffer */
+  private final IWALBuffer buffer;
+  /** manage checkpoints */
+  private final CheckpointManager checkpointManager;
+
+  public WALNode(String identifier, String logDirectory) throws 
FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal node-{}.", logDirectory, 
identifier);
+    }
+    this.buffer = new WALBuffer(identifier, logDirectory);
+    this.checkpointManager = new CheckpointManager(identifier, logDirectory);
+  }
+
+  /** Return true when this folder wal node folder */
+  public static boolean walNodeFolderNameFilter(File dir, String name) {
+    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, InsertPlan insertPlan) {
+    WALEdit walEdit = new WALEdit(memTableId, insertPlan);
+    return log(walEdit);
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, DeletePlan deletePlan) {
+    WALEdit walEdit = new WALEdit(memTableId, deletePlan);
+    return log(walEdit);
+  }
+
+  private WALFlushListener log(WALEdit walEdit) {
+    buffer.write(walEdit);
+    return walEdit.getWalFlushListener();
+  }
+
+  @Override
+  public void onFlushStart(IMemTable memTable) {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;

Review comment:
       Added.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, 
cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();
+  /** cache the biggest byte buffer to serialize checkpoint */
+  private volatile ByteBuffer cachedByteBuffer;
+  /** current checkpoint file version id, only updated by fsyncAndDeleteThread 
*/
+  private int currentLogVersion = 0;
+  /** current checkpoint file log writer, only updated by fsyncAndDeleteThread 
*/
+  private ILogWriter currentLogWriter;
+  // endregion
+
+  public CheckpointManager(String identifier, String logDirectory) throws 
FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, 
identifier);
+    }
+    currentLogWriter =
+        new CheckpointWriter(
+            SystemFileFactory.INSTANCE.getFile(
+                logDirectory, 
CheckpointWriter.getLogFileName(currentLogVersion)));
+    makeGlobalInfoCP();
+    fsyncCheckpointFile();
+  }
+
+  /**
+   * make checkpoint for global memTables' info, this checkpoint only exists 
in the beginning of
+   * each checkpoint file
+   */
+  private void makeGlobalInfoCP() {
+    infoLock.lock();
+    try {
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.GLOBAL_MEMORY_TABLE_INFO, new 
ArrayList<>(memTableId2Info.values()));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /** make checkpoint for create memTable info */
+  public void makeCreateMemTableCP(MemTableInfo memTableInfo) {
+    infoLock.lock();
+    try {
+      memTableId2Info.put(memTableInfo.getMemTableId(), memTableInfo);
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.CREATE_MEMORY_TABLE, 
Collections.singletonList(memTableInfo));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /** make checkpoint for flush memTable info */
+  public void makeFlushMemTableCP(int memTableId) {
+    infoLock.lock();
+    try {
+      MemTableInfo memTableInfo = memTableId2Info.remove(memTableId);
+      if (memTableInfo == null) {
+        return;
+      }
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.FLUSH_MEMORY_TABLE, 
Collections.singletonList(memTableInfo));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  private void logByCachedByteBuffer(Checkpoint checkpoint) {
+    // make sure cached ByteBuffer has enough capacity
+    int estimateSize = checkpoint.serializedSize();
+    if (cachedByteBuffer == null || estimateSize > 
cachedByteBuffer.capacity()) {
+      cachedByteBuffer = ByteBuffer.allocate(estimateSize);
+    }
+    checkpoint.serialize(cachedByteBuffer);
+    try {
+      currentLogWriter.write(cachedByteBuffer);
+    } catch (IOException e) {
+      logger.error("Fail to make checkpoint: {}", checkpoint, e);
+    } finally {
+      cachedByteBuffer.clear();
+    }
+  }
+
+  // region Task to fsync checkpoint file
+  /** Fsync checkpoints to the disk */
+  public void fsyncCheckpointFile() {
+    infoLock.lock();
+    try {
+      try {
+        currentLogWriter.force();
+      } catch (IOException e) {
+        logger.error(
+            "Fail to fsync wal node-{}'s checkpoint writer, change system mode 
to read-only.",
+            identifier,
+            e);
+        config.setReadOnly(true);
+      }
+
+      try {
+        if (tryRollingLogWriter()) {
+          // first log global memTables' info, then delete old checkpoint file
+          makeGlobalInfoCP();
+          currentLogWriter.force();
+          File oldFile =
+              SystemFileFactory.INSTANCE.getFile(
+                  logDirectory, 
CheckpointWriter.getLogFileName(currentLogVersion - 1));
+          oldFile.delete();
+        }
+      } catch (IOException e) {
+        logger.error(
+            "Fail to roll wal node-{}'s checkpoint writer, change system mode 
to read-only.",
+            identifier,
+            e);
+        config.setReadOnly(true);
+      }
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  private boolean tryRollingLogWriter() throws IOException {
+    if (currentLogWriter.size() < LOG_SIZE_LIMIT) {
+      return false;
+    }
+    currentLogWriter.close();
+    currentLogVersion++;
+    File nextLogFile =
+        SystemFileFactory.INSTANCE.getFile(
+            logDirectory, CheckpointWriter.getLogFileName(currentLogVersion));
+    currentLogWriter = new CheckpointWriter(nextLogFile);
+    return true;
+  }
+  // endregion
+
+  public MemTableInfo getOldestMemTableInfo() {
+    // find oldest memTable
+    List<MemTableInfo> memTableInfos;
+    infoLock.lock();
+    try {
+      memTableInfos = new ArrayList<>(memTableId2Info.values());
+    } finally {
+      infoLock.unlock();
+    }
+    if (memTableInfos.isEmpty()) {
+      return null;
+    }
+    MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
+    for (MemTableInfo memTableInfo : memTableInfos) {
+      if (oldestMemTableInfo.getFirstFileVersionId() > 
memTableInfo.getFirstFileVersionId()) {
+        oldestMemTableInfo = memTableInfo;
+      }
+    }
+    return oldestMemTableInfo;
+  }
+
+  /**
+   * Get version id of first valid .wal file
+   *
+   * @return Return {@link Integer#MIN_VALUE} if no file is valid
+   */
+  public int getFirstValidVersionId() {

Review comment:
       Fixed.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, 
cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();
+  /** cache the biggest byte buffer to serialize checkpoint */
+  private volatile ByteBuffer cachedByteBuffer;
+  /** current checkpoint file version id, only updated by fsyncAndDeleteThread 
*/
+  private int currentLogVersion = 0;

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers 
mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = 
LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = 
config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;

Review comment:
       Added.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -224,19 +222,20 @@ public void insert(InsertRowPlan insertRowPlan) throws 
WriteProcessException {
       }
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
-        getLogNode().write(insertRowPlan);
-      } catch (Exception e) {
-        if (enableMemControl && memIncrements != null) {
-          rollbackMemoryInfo(memIncrements);
-        }
-        throw new WriteProcessException(
-            String.format(
-                "%s: %s write WAL failed",
-                storageGroupName, 
tsFileResource.getTsFile().getAbsolutePath()),
-            e);
+    try {
+      WALFlushListener walFlushListener = 
walNode.log(workMemTable.getMemTableId(), insertRowPlan);
+      if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) 
{
+        throw walFlushListener.getCause();
+      }
+    } catch (Exception e) {
+      if (enableMemControl && memIncrements != null) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to