JackieTien97 commented on code in PR #17169:
URL: https://github.com/apache/iotdb/pull/17169#discussion_r2938036636


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;

Review Comment:
   should not return, you should continue, if it's not in stop state, you can 
refer to `AbstractDriverThread`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/AbstractTableSizeCacheWriter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractTableSizeCacheWriter {
+  protected static final Logger logger =
+      LoggerFactory.getLogger(AbstractTableSizeCacheWriter.class);
+  public static final String TEMP_CACHE_FILE_SUBFIX = ".tmp";
+  protected final int regionId;
+  protected long previousCompactionTimestamp = System.currentTimeMillis();
+  protected long lastWriteTimestamp = System.currentTimeMillis();
+  protected int currentIndexFileVersion = 0;
+  protected final File dir;
+
+  public AbstractTableSizeCacheWriter(String database, int regionId) {
+    this.regionId = regionId;
+    this.dir = StorageEngine.getDataRegionSystemDir(database, regionId + "");
+  }
+
+  protected void failedToRecover(Exception e) {
+    TableDiskUsageCache.getInstance().failedToRecover(e);
+  }
+
+  protected void deleteOldVersionFiles(int maxVersion, String prefix, 
List<File> files) {
+    for (File file : files) {
+      try {
+        int version = 
Integer.parseInt(file.getName().substring(prefix.length()));
+        if (version != maxVersion) {
+          Files.deleteIfExists(file.toPath());
+        }
+      } catch (Exception e) {
+        logger.warn(
+            "Failed to delete old version table size cache file {}", 
file.getAbsolutePath());
+      }
+    }
+  }
+
+  public void closeIfIdle() {
+    if (System.currentTimeMillis() - lastWriteTimestamp >= 
TimeUnit.MINUTES.toMillis(1)) {

Review Comment:
   ```suggestion
       if (System.currentTimeMillis() - lastWriteTimestamp >= 
TimeUnit.MINUTES.toMillis(60)) {
   ```
   1min is too short



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.AbstractTableSizeCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class TsFileTableDiskUsageCacheWriter extends 
AbstractTableSizeCacheWriter {
+  public static final String TSFILE_CACHE_KEY_FILENAME_PREFIX = 
"TableSizeKeyFile_";
+  public static final String TSFILE_CACHE_VALUE_FILENAME_PREFIX = 
"TableSizeValueFile_";
+  public static final int KEY_FILE_OFFSET_RECORD_LENGTH = 5 * Long.BYTES + 1;
+  public static final int KEY_FILE_REDIRECT_RECORD_LENGTH = 7 * Long.BYTES + 1;

Review Comment:
   never used



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1165,7 +1215,297 @@ public boolean hasNext() {
     }
   }
 
-  private abstract static class TsBlockSupplier implements Iterator<TsBlock> {
+  private static class TableDiskUsageSupplier implements 
IInformationSchemaContentSupplier {
+    private final List<TSDataType> dataTypes;
+    private final Map<String, List<TTableInfo>> databaseTableInfoMap;
+    private final Filter pushDownFilter;
+    private final PaginationController paginationController;
+    private final OperatorContext operatorContext;
+
+    private DataRegion currentDataRegion;
+    private boolean currentDatabaseOnlyHasOneTable;
+
+    private TableDiskUsageCacheReader currentDataRegionCacheReader;
+    private DataRegionTableSizeQueryContext 
currentDataRegionTableSizeQueryContext;
+
+    private final StorageEngineTimePartitionIterator dataRegionIterator;
+
+    private long prepareCacheReaderCostInNS = 0;
+    private long loadObjectFileCostInNS = 0;
+    private long prepareCachedTsFileIDCostInNS = 0;
+    private long checkAllFilesInTsFileManagerCostInNS = 0;
+    private long readTsFileCacheValueFilesCostInNS = 0;
+
+    private TableDiskUsageSupplier(
+        final List<TSDataType> dataTypes,
+        final UserEntity userEntity,
+        final Filter pushDownFilter,
+        final PaginationController paginationController,
+        final OperatorContext operatorContext,
+        final List<Integer> regionsForCurrentSubTask)
+        throws TException, ClientManagerException {
+      this.dataTypes = dataTypes;
+      this.pushDownFilter = pushDownFilter;
+      this.paginationController = paginationController;
+      this.operatorContext = operatorContext;
+      
AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity);
+      this.databaseTableInfoMap =
+          
operatorContext.getInstanceContext().getDataNodeQueryContext().getDatabaseTableInfoMap();
+      Set<Integer> regions = new HashSet<>(regionsForCurrentSubTask);
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.REGIONS_OF_CURRENT_SUB_TASK, 
regionsForCurrentSubTask.toString());
+      this.dataRegionIterator =
+          new StorageEngineTimePartitionIterator(
+              Optional.of(
+                  dataRegion -> {
+                    List<TTableInfo> tTableInfos =
+                        databaseTableInfoMap.get(dataRegion.getDatabaseName());
+                    if (tTableInfos == null || tTableInfos.isEmpty()) {
+                      return false;
+                    }
+                    if (!regions.contains(dataRegion.getDataRegionId())) {
+                      return false;
+                    }
+                    currentDataRegionTableSizeQueryContext =
+                        new DataRegionTableSizeQueryContext(
+                            false, operatorContext.getInstanceContext());
+                    return true;
+                  }),
+              Optional.empty());

Review Comment:
   why here is empty, time filter cannot push down?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableSizeCacheReader.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.DataRegionTableSizeQueryContext;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TsFileTableSizeCacheReader {

Review Comment:
   ```suggestion
   public class TsFileTableSizeIndexReader {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java:
##########
@@ -174,6 +180,23 @@ public PlanNode 
visitMultiChildProcess(MultiChildProcessNode node, RewriterConte
       return node;
     }
 
+    @Override
+    public PlanNode visitCollect(CollectNode node, RewriterContext context) {
+      PlanNode newNode = node.clone();
+      RewriterContext subContext = new RewriterContext(context.getAnalysis());
+      if (context.getLimit() > 0) {
+        subContext.setLimit(context.getLimit() + context.getOffset());
+      }
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, subContext));
+      }
+      if (node.getChildren().size() > 1) {
+        // keep parent limit/offset node
+        context.setEnablePushDown(false);
+      }

Review Comment:
   when will collectnode has only one child?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java:
##########
@@ -141,6 +142,18 @@ public List<PlanNode> visitMergeSort(MergeSortNode node, 
DistributionPlanContext
     return Collections.singletonList(newRoot);
   }
 
+  @Override
+  public List<PlanNode> visitCollect(CollectNode node, DistributionPlanContext 
context) {
+    CollectNode newRoot =
+        new CollectNode(
+            context.queryContext.getQueryId().genPlanNodeId(), 
node.getOutputColumnNames());
+    for (int i = 0; i < node.getChildren().size(); i++) {
+      List<PlanNode> rewroteNodes = rewrite(node.getChildren().get(i), 
context);
+      rewroteNodes.forEach(newRoot::addChild);
+    }
+    return Collections.singletonList(newRoot);
+  }

Review Comment:
   Why need to override it? what's the difference from the 
BaseSourceRewriter.defaultRewrite()



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/object/IObjectTableSizeCacheReader.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object;
+
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.DataRegionTableSizeQueryContext;
+
+import java.io.IOException;
+
+public interface IObjectTableSizeCacheReader extends AutoCloseable {
+  boolean loadObjectFileTableSize(

Review Comment:
   add java doc for this method



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();
+    compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+    processedOperationCountSinceLastPeriodicCheck = 0;
+  }
+
+  /**
+   * Any unrecoverable error in a single writer will mark the whole 
TableDiskUsageCache as failed
+   * and disable further operations.
+   */
+  protected void failedToRecover(Exception e) {
+    failedToRecover = true;
+    LOGGER.error("Failed to recover TableDiskUsageCache", e);
+  }
+
+  protected void 
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+    try {
+      writer.tsFileCacheWriter.syncIfNecessary();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to sync tsfile table size cache.", e);
+    }
+  }
+
+  // Hook for subclasses to persist pending object table size deltas. No-op by 
default.
+  protected void 
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+  protected void compactIfNecessary(long maxRunTime) {
+    if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      if (System.currentTimeMillis() - startTime > maxRunTime) {
+        break;
+      }
+      if (writer.getActiveReaderNum() > 0) {
+        continue;
+      }
+      writer.compactIfNecessary();
+    }
+  }
+
+  protected void checkAndMayCloseIdleWriter() {
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      writer.closeIfIdle();
+    }
+  }
+
+  public void write(String database, TsFileID tsFileID, Map<String, Long> 
tableSizeMap) {
+    if (tableSizeMap == null || tableSizeMap.isEmpty()) {
+      // tree model
+      return;
+    }
+    addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
+  }
+
+  public void write(String database, TsFileID originTsFileID, TsFileID 
newTsFileID) {
+    addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, 
newTsFileID));
+  }
+
+  public void writeObjectDelta(
+      String database, int regionId, long timePartition, String table, long 
size, int num) {
+    throw new UnsupportedOperationException("writeObjectDelta");
+  }
+
+  public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+    StartReadOperation operation =
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(0, null, 0, null, 
dataRegion.getDataRegionId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
+    return operation.future;
+  }
+
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
+    addOperationToQueue(operation);
+  }
+
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!region.isTableModel()) {
+      return;
+    }
+    addOperationToQueue(operation);
+  }
+
+  public void remove(String database, int regionId) {
+    RemoveRegionOperation operation = new RemoveRegionOperation(database, 
regionId);
+    if (!addOperationToQueue(operation)) {
+      return;
+    }
+    try {
+      operation.future.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
+    }
+  }
+
+  protected boolean addOperationToQueue(Operation operation) {
+    if (failedToRecover || stop) {
+      return false;
+    }
+    try {
+      queue.put(operation);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    return true;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public void close() {
+    if (scheduledExecutorService == null) {
+      return;
+    }
+    try {
+      stop = true;
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+      writerMap.clear();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @TestOnly
+  public void ensureRunning() {
+    stop = false;
+    failedToRecover = false;
+    if (scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.FILE_TIME_INDEX_RECORD.getName());
+      scheduledExecutorService.submit(this::run);
+    }
+  }
+
+  protected DataRegionTableSizeCacheWriter createWriter(
+      String database, int regionId, DataRegion region) {
+    return new DataRegionTableSizeCacheWriter(database, regionId, region);
+  }
+
+  protected TsFileTableSizeCacheReader createTsFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    TsFileTableDiskUsageCacheWriter tsFileCacheWriter = 
dataRegionWriter.tsFileCacheWriter;
+    return new TsFileTableSizeCacheReader(
+        tsFileCacheWriter.keyFileLength(),
+        tsFileCacheWriter.getKeyFile(),
+        tsFileCacheWriter.valueFileLength(),
+        tsFileCacheWriter.getValueFile(),
+        regionId);
+  }
+
+  protected IObjectTableSizeCacheReader createObjectFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    return new EmptyObjectTableSizeCacheReader();
+  }
+
+  protected abstract static class Operation {
+    protected final String database;
+    protected final int regionId;
+
+    protected Operation(String database, int regionId) {
+      this.database = database;
+      this.regionId = regionId;
+    }
+
+    public int getRegionId() {
+      return regionId;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public abstract void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException;
+  }
+
+  protected static class StartReadOperation extends Operation {
+    protected final DataRegion region;
+    protected final boolean readTsFileCache;
+    protected final boolean readObjectFileCache;
+    public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =
+        new CompletableFuture<>();
+
+    public StartReadOperation(
+        DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+      this.readTsFileCache = readTsFileCache;
+      this.readObjectFileCache = readObjectFileCache;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
+      try {
+        if (writer == null || writer.getRemovedFuture() != null) {
+          // region is removing or removed
+          future.complete(
+              new Pair<>(
+                  new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
+                  new EmptyObjectTableSizeCacheReader()));
+          return;
+        }
+        writer.increaseActiveReaderNum();
+        // Flush buffered writes to ensure readers observe a consistent 
snapshot
+        writer.flush();
+        TsFileTableSizeCacheReader tsFileTableSizeCacheReader =
+            readTsFileCache ? 
tableDiskUsageCache.createTsFileCacheReader(writer, regionId) : null;
+        IObjectTableSizeCacheReader objectTableSizeCacheReader =
+            readObjectFileCache
+                ? tableDiskUsageCache.createObjectFileCacheReader(writer, 
regionId)
+                : null;
+        future.complete(new Pair<>(tsFileTableSizeCacheReader, 
objectTableSizeCacheReader));
+      } catch (Throwable t) {
+        future.completeExceptionally(t);
+      }
+    }
+  }
+
+  private static class EndReadOperation extends Operation {
+    protected final DataRegion region;
+
+    public EndReadOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      tableDiskUsageCache.writerMap.computeIfPresent(
+          regionId,
+          (k, writer) -> {
+            if (writer.dataRegion != region) {
+              return writer;
+            }
+            writer.decreaseActiveReaderNum();
+            // Complete pending remove when the last reader exits
+            if (writer.getRemovedFuture() != null) {
+              writer.close();
+              writer.getRemovedFuture().complete(null);
+              writer.setRemovedFuture(null);
+              return null;
+            }
+            return writer;
+          });
+    }
+  }
+
+  private static class WriteOperation extends Operation {
+
+    private final TsFileID tsFileID;
+    private final Map<String, Long> tableSizeMap;
+
+    protected WriteOperation(String database, TsFileID tsFileID, Map<String, 
Long> tableSizeMap) {
+      super(database, tsFileID.regionId);
+      this.tsFileID = tsFileID;
+      this.tableSizeMap = tableSizeMap;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter dataRegionTableSizeCacheWriter =
+          tableDiskUsageCache.writerMap.get(regionId);
+      if (dataRegionTableSizeCacheWriter != null) {
+        dataRegionTableSizeCacheWriter.tsFileCacheWriter.write(tsFileID, 
tableSizeMap);
+      }
+    }
+  }
+
+  private static class ReplaceTsFileOperation extends Operation {
+    private final TsFileID originTsFileID;
+    private final TsFileID newTsFileID;
+
+    public ReplaceTsFileOperation(String database, TsFileID originTsFileID, 
TsFileID newTsFileID) {
+      super(database, originTsFileID.regionId);
+      this.originTsFileID = originTsFileID;
+      this.newTsFileID = newTsFileID;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
+      if (writer != null) {
+        writer.tsFileCacheWriter.write(originTsFileID, newTsFileID);
+      }
+    }
+  }
+
+  protected static class RegisterRegionOperation extends Operation {
+
+    protected final DataRegion dataRegion;
+    protected final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    public RegisterRegionOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.dataRegion = dataRegion;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) {
+      tableDiskUsageCache.writerMap.computeIfAbsent(
+          regionId, regionId -> tableDiskUsageCache.createWriter(database, 
regionId, dataRegion));
+      future.complete(null);
+    }
+
+    public CompletableFuture<Void> getFuture() {
+      return future;
+    }
+  }
+
+  private static class RemoveRegionOperation extends Operation {
+
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    private RemoveRegionOperation(String database, int regionId) {
+      super(database, regionId);
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) {
+      tableDiskUsageCache.writerMap.computeIfPresent(
+          regionId,
+          (k, writer) -> {
+            if (writer.getActiveReaderNum() > 0) {
+              // If there are active readers, defer removal until all readers 
finish
+              writer.setRemovedFuture(future);
+              return writer;
+            }
+            writer.close();
+            future.complete(null);
+            return null;
+          });
+    }
+  }
+
+  public static TableDiskUsageCache getInstance() {
+    return TableDiskUsageCache.InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static final TableDiskUsageCache INSTANCE = loadInstance();
+
+    private static TableDiskUsageCache loadInstance() {
+      ServiceLoader<TableDiskUsageCacheProvider> loader =
+          ServiceLoader.load(TableDiskUsageCacheProvider.class);
+      for (TableDiskUsageCacheProvider provider : loader) {
+        return provider.create();
+      }
+      return new DefaultTableDiskUsageCacheProvider().create();
+    }
+  }
+
+  protected static class DataRegionTableSizeCacheWriter {
+    protected final DataRegion dataRegion;
+    protected final TsFileTableDiskUsageCacheWriter tsFileCacheWriter;
+    protected int activeReaderNum = 0;
+    protected CompletableFuture<Void> removedFuture;
+
+    protected DataRegionTableSizeCacheWriter(String database, int regionId, 
DataRegion dataRegion) {
+      this.dataRegion = dataRegion;
+      this.tsFileCacheWriter = new TsFileTableDiskUsageCacheWriter(database, 
regionId);
+    }
+
+    public void increaseActiveReaderNum() {
+      activeReaderNum++;
+    }
+
+    public void decreaseActiveReaderNum() {
+      if (activeReaderNum > 0) {
+        activeReaderNum--;
+      }

Review Comment:
   else should at least print a error log, indicate that there may be bug



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {

Review Comment:
   what if flush happens just every 1s? I think we need another condition to 
trigger snapshot:
   
   record cpu time since last period, like since last PeriodicCheck, time flies 
10s, but only 1s(10%) used by operation apply, then I think we can do snapshot.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();
+    compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+    processedOperationCountSinceLastPeriodicCheck = 0;
+  }
+
+  /**
+   * Any unrecoverable error in a single writer will mark the whole 
TableDiskUsageCache as failed
+   * and disable further operations.
+   */
+  protected void failedToRecover(Exception e) {
+    failedToRecover = true;
+    LOGGER.error("Failed to recover TableDiskUsageCache", e);
+  }
+
+  protected void 
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+    try {
+      writer.tsFileCacheWriter.syncIfNecessary();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to sync tsfile table size cache.", e);
+    }
+  }
+
+  // Hook for subclasses to persist pending object table size deltas. No-op by 
default.
+  protected void 
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+  protected void compactIfNecessary(long maxRunTime) {
+    if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      if (System.currentTimeMillis() - startTime > maxRunTime) {
+        break;
+      }
+      if (writer.getActiveReaderNum() > 0) {
+        continue;
+      }
+      writer.compactIfNecessary();
+    }
+  }
+
+  protected void checkAndMayCloseIdleWriter() {
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      writer.closeIfIdle();
+    }
+  }
+
+  public void write(String database, TsFileID tsFileID, Map<String, Long> 
tableSizeMap) {
+    if (tableSizeMap == null || tableSizeMap.isEmpty()) {
+      // tree model
+      return;
+    }
+    addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
+  }
+
+  public void write(String database, TsFileID originTsFileID, TsFileID 
newTsFileID) {
+    addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, 
newTsFileID));
+  }
+
+  public void writeObjectDelta(
+      String database, int regionId, long timePartition, String table, long 
size, int num) {
+    throw new UnsupportedOperationException("writeObjectDelta");
+  }
+
+  public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+    StartReadOperation operation =
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(0, null, 0, null, 
dataRegion.getDataRegionId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
+    return operation.future;
+  }
+
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
+    addOperationToQueue(operation);
+  }
+
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!region.isTableModel()) {
+      return;
+    }
+    addOperationToQueue(operation);
+  }
+
+  public void remove(String database, int regionId) {
+    RemoveRegionOperation operation = new RemoveRegionOperation(database, 
regionId);
+    if (!addOperationToQueue(operation)) {
+      return;
+    }
+    try {
+      operation.future.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
+    }
+  }
+
+  protected boolean addOperationToQueue(Operation operation) {
+    if (failedToRecover || stop) {
+      return false;
+    }
+    try {
+      queue.put(operation);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    return true;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public void close() {
+    if (scheduledExecutorService == null) {
+      return;
+    }
+    try {
+      stop = true;
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+      writerMap.clear();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @TestOnly
+  public void ensureRunning() {
+    stop = false;
+    failedToRecover = false;
+    if (scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.FILE_TIME_INDEX_RECORD.getName());
+      scheduledExecutorService.submit(this::run);
+    }
+  }
+
+  protected DataRegionTableSizeCacheWriter createWriter(
+      String database, int regionId, DataRegion region) {
+    return new DataRegionTableSizeCacheWriter(database, regionId, region);
+  }
+
+  protected TsFileTableSizeCacheReader createTsFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    TsFileTableDiskUsageCacheWriter tsFileCacheWriter = 
dataRegionWriter.tsFileCacheWriter;
+    return new TsFileTableSizeCacheReader(
+        tsFileCacheWriter.keyFileLength(),
+        tsFileCacheWriter.getKeyFile(),
+        tsFileCacheWriter.valueFileLength(),
+        tsFileCacheWriter.getValueFile(),
+        regionId);
+  }
+
+  protected IObjectTableSizeCacheReader createObjectFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    return new EmptyObjectTableSizeCacheReader();
+  }
+
+  protected abstract static class Operation {
+    protected final String database;
+    protected final int regionId;
+
+    protected Operation(String database, int regionId) {
+      this.database = database;
+      this.regionId = regionId;
+    }
+
+    public int getRegionId() {
+      return regionId;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public abstract void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException;
+  }
+
+  protected static class StartReadOperation extends Operation {
+    protected final DataRegion region;
+    protected final boolean readTsFileCache;
+    protected final boolean readObjectFileCache;
+    public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =
+        new CompletableFuture<>();
+
+    public StartReadOperation(
+        DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+      this.readTsFileCache = readTsFileCache;
+      this.readObjectFileCache = readObjectFileCache;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
+      try {
+        if (writer == null || writer.getRemovedFuture() != null) {
+          // region is removing or removed
+          future.complete(
+              new Pair<>(
+                  new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
+                  new EmptyObjectTableSizeCacheReader()));
+          return;
+        }
+        writer.increaseActiveReaderNum();
+        // Flush buffered writes to ensure readers observe a consistent 
snapshot
+        writer.flush();
+        TsFileTableSizeCacheReader tsFileTableSizeCacheReader =
+            readTsFileCache ? 
tableDiskUsageCache.createTsFileCacheReader(writer, regionId) : null;
+        IObjectTableSizeCacheReader objectTableSizeCacheReader =
+            readObjectFileCache
+                ? tableDiskUsageCache.createObjectFileCacheReader(writer, 
regionId)
+                : null;
+        future.complete(new Pair<>(tsFileTableSizeCacheReader, 
objectTableSizeCacheReader));
+      } catch (Throwable t) {
+        future.completeExceptionally(t);
+      }
+    }
+  }
+
+  private static class EndReadOperation extends Operation {
+    protected final DataRegion region;
+
+    public EndReadOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      tableDiskUsageCache.writerMap.computeIfPresent(
+          regionId,
+          (k, writer) -> {
+            if (writer.dataRegion != region) {
+              return writer;
+            }
+            writer.decreaseActiveReaderNum();
+            // Complete pending remove when the last reader exits
+            if (writer.getRemovedFuture() != null) {

Review Comment:
   only the last active reader should do it?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DefaultTableDiskUsageCacheProvider.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+public class DefaultTableDiskUsageCacheProvider implements 
TableDiskUsageCacheProvider {

Review Comment:
   ```suggestion
   public class DefaultTableDiskUsageIndexProvider implements 
TableDiskUsageCacheProvider {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.AbstractTableSizeCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class TsFileTableDiskUsageCacheWriter extends 
AbstractTableSizeCacheWriter {

Review Comment:
   ```suggestion
   public class TsFileTableDiskUsageIndexWriter extends 
AbstractTableSizeCacheWriter {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1165,7 +1215,297 @@ public boolean hasNext() {
     }
   }
 
-  private abstract static class TsBlockSupplier implements Iterator<TsBlock> {
+  private static class TableDiskUsageSupplier implements 
IInformationSchemaContentSupplier {
+    private final List<TSDataType> dataTypes;
+    private final Map<String, List<TTableInfo>> databaseTableInfoMap;
+    private final Filter pushDownFilter;
+    private final PaginationController paginationController;
+    private final OperatorContext operatorContext;
+
+    private DataRegion currentDataRegion;
+    private boolean currentDatabaseOnlyHasOneTable;
+
+    private TableDiskUsageCacheReader currentDataRegionCacheReader;
+    private DataRegionTableSizeQueryContext 
currentDataRegionTableSizeQueryContext;
+
+    private final StorageEngineTimePartitionIterator dataRegionIterator;
+
+    private long prepareCacheReaderCostInNS = 0;
+    private long loadObjectFileCostInNS = 0;
+    private long prepareCachedTsFileIDCostInNS = 0;
+    private long checkAllFilesInTsFileManagerCostInNS = 0;
+    private long readTsFileCacheValueFilesCostInNS = 0;
+
+    private TableDiskUsageSupplier(
+        final List<TSDataType> dataTypes,
+        final UserEntity userEntity,
+        final Filter pushDownFilter,
+        final PaginationController paginationController,
+        final OperatorContext operatorContext,
+        final List<Integer> regionsForCurrentSubTask)
+        throws TException, ClientManagerException {
+      this.dataTypes = dataTypes;
+      this.pushDownFilter = pushDownFilter;
+      this.paginationController = paginationController;
+      this.operatorContext = operatorContext;
+      
AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity);
+      this.databaseTableInfoMap =
+          
operatorContext.getInstanceContext().getDataNodeQueryContext().getDatabaseTableInfoMap();
+      Set<Integer> regions = new HashSet<>(regionsForCurrentSubTask);
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.REGIONS_OF_CURRENT_SUB_TASK, 
regionsForCurrentSubTask.toString());
+      this.dataRegionIterator =
+          new StorageEngineTimePartitionIterator(
+              Optional.of(
+                  dataRegion -> {
+                    List<TTableInfo> tTableInfos =
+                        databaseTableInfoMap.get(dataRegion.getDatabaseName());
+                    if (tTableInfos == null || tTableInfos.isEmpty()) {
+                      return false;
+                    }
+                    if (!regions.contains(dataRegion.getDataRegionId())) {
+                      return false;
+                    }
+                    currentDataRegionTableSizeQueryContext =
+                        new DataRegionTableSizeQueryContext(
+                            false, operatorContext.getInstanceContext());
+                    return true;
+                  }),
+              Optional.empty());
+    }
+
+    @Override
+    public boolean hasNext() {
+      boolean result = hasNextInternal();
+      if (!result) {
+        updateSpecifiedInfo();
+      }
+      return result;
+    }
+
+    private boolean hasNextInternal() {
+      if (currentDataRegionCacheReader != null) {
+        return true;
+      }
+      if (!paginationController.hasCurLimit()) {
+        return false;
+      }
+      try {
+        while (dataRegionIterator.nextDataRegion()) {
+          currentDataRegion = dataRegionIterator.currentDataRegion();
+          for (Long timePartition : 
currentDataRegion.getTsFileManager().getTimePartitions()) {
+            Map<String, Long> tablesToScan = 
getTablesToScan(currentDataRegion, timePartition);
+            if (!tablesToScan.isEmpty()) {
+              currentDataRegionTableSizeQueryContext.addTimePartition(
+                  timePartition, new 
TimePartitionTableSizeQueryContext(tablesToScan));
+            }
+          }
+          if (currentDataRegionTableSizeQueryContext.isEmpty()) {
+            continue;
+          }
+          currentDataRegionCacheReader =
+              new TableDiskUsageCacheReader(
+                  currentDataRegion,
+                  currentDataRegionTableSizeQueryContext,
+                  currentDatabaseOnlyHasOneTable);
+          return true;
+        }
+      } catch (Exception e) {
+        closeDataRegionReader();
+        throw new IoTDBRuntimeException(
+            e.getMessage(), e, 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      }
+      return false;
+    }
+
+    private void updateSpecifiedInfo() {
+      if (operatorContext
+          .getSpecifiedInfo()
+          .containsKey(PlanGraphPrinter.PREPARE_CACHE_READER_COST)) {
+        return;
+      }
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.PREPARE_CACHE_READER_COST,
+          TimeUnit.NANOSECONDS.toMillis(prepareCacheReaderCostInNS)
+              + IoTDBConstant.SPACE
+              + RpcUtils.MILLISECOND);
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.LOAD_OBJECT_FILE_COST,
+          TimeUnit.NANOSECONDS.toMillis(loadObjectFileCostInNS)
+              + IoTDBConstant.SPACE
+              + RpcUtils.MILLISECOND);
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.PREPARE_CACHED_TSFILE_ID_COST,
+          TimeUnit.NANOSECONDS.toMillis(prepareCachedTsFileIDCostInNS)
+              + IoTDBConstant.SPACE
+              + RpcUtils.MILLISECOND);
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.CHECK_ALL_FILES_IN_TSFILE_MANAGER_COST,
+          TimeUnit.NANOSECONDS.toMillis(checkAllFilesInTsFileManagerCostInNS)
+              + IoTDBConstant.SPACE
+              + RpcUtils.MILLISECOND);
+      operatorContext.recordSpecifiedInfo(
+          PlanGraphPrinter.READ_TSFILE_CACHE_VALUE_FILES_COST,
+          TimeUnit.NANOSECONDS.toMillis(readTsFileCacheValueFilesCostInNS)
+              + IoTDBConstant.SPACE
+              + RpcUtils.MILLISECOND);
+    }
+
+    private Map<String, Long> getTablesToScan(DataRegion dataRegion, long 
timePartition) {
+      String databaseName = dataRegion.getDatabaseName();
+      List<TTableInfo> tTableInfos = databaseTableInfoMap.get(databaseName);
+      if (tTableInfos == null || tTableInfos.isEmpty()) {
+        return Collections.emptyMap();
+      }
+
+      Map<String, Long> tablesToScan = new TreeMap<>();
+      int totalValidTableCount = 0;
+      for (TTableInfo tTableInfo : tTableInfos) {
+        if (tTableInfo.getType() != TableType.BASE_TABLE.ordinal()) {
+          continue;
+        }
+        totalValidTableCount++;
+        if (pushDownFilter != null) {
+          Object[] row = new Object[5];
+          row[0] = new Binary(dataRegion.getDatabaseName(), 
TSFileConfig.STRING_CHARSET);
+          row[1] = new Binary(tTableInfo.getTableName(), 
TSFileConfig.STRING_CHARSET);
+          row[2] = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+          row[3] = dataRegion.getDataRegionId();
+          row[4] = timePartition;
+          if (!pushDownFilter.satisfyRow(0, row)) {
+            continue;
+          }
+        }
+        if (!paginationController.hasCurLimit()) {
+          break;
+        }
+        if (paginationController.hasCurOffset()) {
+          paginationController.consumeOffset();
+          continue;
+        }
+        paginationController.consumeLimit();
+        tablesToScan.put(tTableInfo.getTableName(), 0L);
+      }
+      currentDatabaseOnlyHasOneTable = totalValidTableCount == 1;
+      return tablesToScan;
+    }
+
+    @Override
+    public TsBlock next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      long maxRuntime = 
OperatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      long start = System.nanoTime();
+      long prevStageEndTime = start;
+
+      try {
+        try {
+          if (!currentDataRegionCacheReader.prepareCacheReader(start, 
maxRuntime)) {
+            return null;
+          }
+        } finally {
+          long now = System.nanoTime();
+          prepareCacheReaderCostInNS += now - prevStageEndTime;
+          prevStageEndTime = now;
+        }
+
+        try {
+          if 
(!currentDataRegionCacheReader.loadObjectFileTableSizeCache(start, maxRuntime)) 
{
+            return null;
+          }
+        } finally {
+          long now = System.nanoTime();
+          loadObjectFileCostInNS += now - prevStageEndTime;
+          prevStageEndTime = now;
+        }
+
+        try {
+          if (!currentDataRegionCacheReader.prepareCachedTsFileIDKeys(start, 
maxRuntime)) {
+            return null;
+          }
+        } finally {
+          long now = System.nanoTime();
+          prepareCachedTsFileIDCostInNS += now - prevStageEndTime;
+          prevStageEndTime = now;
+        }
+
+        try {
+          if 
(!currentDataRegionCacheReader.checkAllFilesInTsFileManager(start, maxRuntime)) 
{
+            return null;
+          }
+        } finally {
+          long now = System.nanoTime();
+          checkAllFilesInTsFileManagerCostInNS += now - prevStageEndTime;
+          prevStageEndTime = now;
+        }
+
+        try {
+          if 
(!currentDataRegionCacheReader.readCacheValueFilesAndUpdateResultMap(
+              start, maxRuntime)) {
+            return null;
+          }
+        } finally {
+          readTsFileCacheValueFilesCostInNS += System.nanoTime() - 
prevStageEndTime;
+        }
+
+        return buildTsBlock();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
+      } catch (Exception e) {
+        throw new IoTDBRuntimeException(
+            e.getMessage(), e, 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      }
+    }
+
+    private TsBlock buildTsBlock() {
+      TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
+      for (Map.Entry<Long, TimePartitionTableSizeQueryContext> entry :
+          currentDataRegionTableSizeQueryContext
+              .getTimePartitionTableSizeQueryContextMap()
+              .entrySet()) {
+        long timePartition = entry.getKey();
+        for (Map.Entry<String, Long> tableSizeEntry :
+            entry.getValue().getTableSizeResultMap().entrySet()) {
+          String tableName = tableSizeEntry.getKey();
+          long size = tableSizeEntry.getValue();
+          builder.getTimeColumnBuilder().writeLong(0);
+          ColumnBuilder[] columns = builder.getValueColumnBuilders();
+
+          columns[0].writeBinary(
+              new Binary(currentDataRegion.getDatabaseName(), 
TSFileConfig.STRING_CHARSET));
+          columns[1].writeBinary(new Binary(tableName, 
TSFileConfig.STRING_CHARSET));
+          
columns[2].writeInt(IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
+          columns[3].writeInt(currentDataRegion.getDataRegionId());
+          columns[4].writeLong(timePartition);
+          columns[5].writeLong(size);
+          builder.declarePosition();
+        }
+      }
+      closeDataRegionReader();
+      return builder.build();
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeDataRegionReader();
+    }
+
+    private void closeDataRegionReader() {
+      if (currentDataRegionCacheReader == null) {
+        return;
+      }
+      try {
+        currentDataRegionCacheReader.close();
+        currentDataRegionCacheReader = null;
+      } catch (IOException ignored) {

Review Comment:
   why ignore? should at least logger.info or warn



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();

Review Comment:
   what if we rewrite data to some inactive region after a long time?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/AbstractTableSizeCacheWriter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractTableSizeCacheWriter {

Review Comment:
   ```suggestion
   public abstract class AbstractTableSizeIndexWriter {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class TableDiskUsageCacheReader implements Closeable {
+
+  private final DataRegion dataRegion;
+  private final DataRegionTableSizeQueryContext dataRegionContext;
+
+  private CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>>
+      prepareReaderFuture;
+  private TsFileTableSizeCacheReader tsFileTableSizeCacheReader;
+  private IObjectTableSizeCacheReader objectTableSizeCacheReader;
+
+  private boolean objectFileSizeLoaded = false;
+  private boolean tsFileIdKeysPrepared = false;
+  private boolean allTsFileResourceChecked = false;
+
+  private final Iterator<Map.Entry<Long, TimePartitionTableSizeQueryContext>> 
timePartitionIterator;
+
+  private final boolean currentDatabaseOnlyHasOneTable;
+  private TableDiskUsageStatisticUtil tableDiskUsageStatisticUtil;
+
+  private final List<Pair<TsFileID, Long>> tsFilesToQueryInCache = new 
ArrayList<>();
+  private Iterator<Pair<TsFileID, Long>> tsFilesToQueryInCacheIterator = null;
+
+  public TableDiskUsageCacheReader(
+      DataRegion dataRegion,
+      DataRegionTableSizeQueryContext dataRegionContext,
+      boolean databaseHasOnlyOneTable) {
+    this.dataRegion = dataRegion;
+    this.dataRegionContext = dataRegionContext;
+    this.currentDatabaseOnlyHasOneTable = databaseHasOnlyOneTable;
+    if (dataRegionContext.isNeedAllData()) {
+      
dataRegionContext.addAllTimePartitionsInTsFileManager(dataRegion.getTsFileManager());
+    }
+    this.timePartitionIterator =
+        
dataRegionContext.getTimePartitionTableSizeQueryContextMap().entrySet().iterator();
+    dataRegionContext.reserveMemoryForResultMap();
+  }
+
+  public boolean prepareCacheReader(long startTime, long maxRunTime) throws 
Exception {
+    if (this.tsFileTableSizeCacheReader == null) {
+      this.prepareReaderFuture =
+          this.prepareReaderFuture == null
+              ? TableDiskUsageCache.getInstance().startRead(dataRegion, true, 
true)
+              : this.prepareReaderFuture;
+      do {
+        if (prepareReaderFuture.isDone()) {
+          Pair<TsFileTableSizeCacheReader, IObjectTableSizeCacheReader> 
readerPair =
+              prepareReaderFuture.get();
+          this.tsFileTableSizeCacheReader = readerPair.left;
+          this.tsFileTableSizeCacheReader.openKeyFile();
+          this.objectTableSizeCacheReader = readerPair.right;
+          break;
+        } else {
+          Thread.sleep(1);

Review Comment:
   ```suggestion
             prepareReaderFuture.wait(System.nanoTime() - startTime);
   
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java:
##########
@@ -171,6 +171,9 @@ public void endFile() throws IOException {
       // set empty target file to DELETED
       if (isEmptyFile[i]) {
         targetResources.get(i).forceMarkDeleted();
+      } else if (compactionTaskSummary != null) {
+        compactionTaskSummary.recordTargetTsFileTableSizeMap(
+            targetResources.get(i), 
targetFileWriters.get(i).getTableSizeMap());

Review Comment:
   else if or standalone if branch? if targetResources.get(i) is empty, should 
we recordTargetTsFileTableSizeMap for this compaction?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/object/IObjectTableSizeCacheReader.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object;
+
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.DataRegionTableSizeQueryContext;
+
+import java.io.IOException;
+
+public interface IObjectTableSizeCacheReader extends AutoCloseable {
+  boolean loadObjectFileTableSize(
+      DataRegionTableSizeQueryContext dataRegionContext, long startTime, long 
maxRunTime)
+      throws IOException;
+
+  @Override
+  void close();

Review Comment:
   why you need to override it in an interface?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {

Review Comment:
   ```suggestion
   public class TableDiskUsageIndex {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java:
##########
@@ -3652,29 +3653,82 @@ public Analysis visitShowQueries(
     analysis.setSourceExpressions(sourceExpressions);
     sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, 
expression));
 
-    analyzeWhere(analysis, showQueriesStatement);
+    if (!analyzeWhere(
+        analysis,
+        showQueriesStatement.getWhereCondition(),
+        ColumnHeaderConstant.showQueriesColumnHeaders)) {
+      showQueriesStatement.setWhereCondition(null);
+    }
 
     analysis.setMergeOrderParameter(new 
OrderByParameter(showQueriesStatement.getSortItemList()));
 
     return analysis;
   }
 
-  private void analyzeWhere(Analysis analysis, ShowQueriesStatement 
showQueriesStatement) {
-    WhereCondition whereCondition = showQueriesStatement.getWhereCondition();
+  @Override
+  public Analysis visitShowDiskUsage(
+      ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) {
+    Analysis analysis = new Analysis();
+    analysis.setRealStatement(showDiskUsageStatement);
+    
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowDiskUsageHeader());
+    analysis.setVirtualSource(true);
+
+    List<TDataNodeLocation> allReadableDataNodeLocations = 
getReadableDataNodeLocations();
+    if (allReadableDataNodeLocations.isEmpty()) {
+      throw new StatementAnalyzeException("no Running DataNodes");
+    }
+    analysis.setReadableDataNodeLocations(allReadableDataNodeLocations);
+
+    Set<Expression> sourceExpressions = new HashSet<>();
+    for (ColumnHeader columnHeader : 
analysis.getRespDatasetHeader().getColumnHeaders()) {
+      sourceExpressions.add(
+          TimeSeriesOperand.constructColumnHeaderExpression(
+              columnHeader.getColumnName(), columnHeader.getColumnType()));
+    }
+    analysis.setSourceExpressions(sourceExpressions);
+    sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, 
expression));
+
+    if (!analyzeWhere(
+        analysis,
+        showDiskUsageStatement.getWhereCondition(),
+        ColumnHeaderConstant.showDiskUsageColumnHeaders)) {
+      showDiskUsageStatement.setWhereCondition(null);
+    }
+
+    analysis.setMergeOrderParameter(new 
OrderByParameter(showDiskUsageStatement.getSortItemList()));
+
+    return analysis;
+  }
+
+  private boolean analyzeWhere(

Review Comment:
   add java doc about the method, at least including when return true, when 
retrun false



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();
+    compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+    processedOperationCountSinceLastPeriodicCheck = 0;
+  }
+
+  /**
+   * Any unrecoverable error in a single writer will mark the whole 
TableDiskUsageCache as failed
+   * and disable further operations.
+   */
+  protected void failedToRecover(Exception e) {
+    failedToRecover = true;
+    LOGGER.error("Failed to recover TableDiskUsageCache", e);
+  }
+
+  protected void 
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+    try {
+      writer.tsFileCacheWriter.syncIfNecessary();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to sync tsfile table size cache.", e);
+    }
+  }
+
+  // Hook for subclasses to persist pending object table size deltas. No-op by 
default.
+  protected void 
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+  protected void compactIfNecessary(long maxRunTime) {
+    if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      if (System.currentTimeMillis() - startTime > maxRunTime) {
+        break;
+      }
+      if (writer.getActiveReaderNum() > 0) {
+        continue;
+      }
+      writer.compactIfNecessary();
+    }
+  }
+
+  protected void checkAndMayCloseIdleWriter() {
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      writer.closeIfIdle();
+    }
+  }
+
+  public void write(String database, TsFileID tsFileID, Map<String, Long> 
tableSizeMap) {
+    if (tableSizeMap == null || tableSizeMap.isEmpty()) {
+      // tree model
+      return;
+    }
+    addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
+  }
+
+  public void write(String database, TsFileID originTsFileID, TsFileID 
newTsFileID) {
+    addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, 
newTsFileID));
+  }
+
+  public void writeObjectDelta(
+      String database, int regionId, long timePartition, String table, long 
size, int num) {
+    throw new UnsupportedOperationException("writeObjectDelta");
+  }
+
+  public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+    StartReadOperation operation =
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(0, null, 0, null, 
dataRegion.getDataRegionId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
+    return operation.future;
+  }
+
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
+    addOperationToQueue(operation);
+  }
+
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!region.isTableModel()) {
+      return;
+    }
+    addOperationToQueue(operation);

Review Comment:
   why ignore the return value of `addOperationToQueue`? what if it's false? 
and the future in `RegisterRegionOperation` is never used.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());

Review Comment:
   ```suggestion
               ThreadName.TABLE_SIZE_INDEX_RECORD.getName());
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();
+    compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+    processedOperationCountSinceLastPeriodicCheck = 0;
+  }
+
+  /**
+   * Any unrecoverable error in a single writer will mark the whole 
TableDiskUsageCache as failed
+   * and disable further operations.
+   */
+  protected void failedToRecover(Exception e) {
+    failedToRecover = true;
+    LOGGER.error("Failed to recover TableDiskUsageCache", e);
+  }
+
+  protected void 
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+    try {
+      writer.tsFileCacheWriter.syncIfNecessary();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to sync tsfile table size cache.", e);
+    }
+  }
+
+  // Hook for subclasses to persist pending object table size deltas. No-op by 
default.
+  protected void 
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+  protected void compactIfNecessary(long maxRunTime) {
+    if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      if (System.currentTimeMillis() - startTime > maxRunTime) {
+        break;
+      }
+      if (writer.getActiveReaderNum() > 0) {
+        continue;
+      }
+      writer.compactIfNecessary();
+    }
+  }
+
+  protected void checkAndMayCloseIdleWriter() {
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      writer.closeIfIdle();
+    }
+  }
+
+  public void write(String database, TsFileID tsFileID, Map<String, Long> 
tableSizeMap) {
+    if (tableSizeMap == null || tableSizeMap.isEmpty()) {
+      // tree model
+      return;
+    }
+    addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
+  }
+
+  public void write(String database, TsFileID originTsFileID, TsFileID 
newTsFileID) {
+    addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, 
newTsFileID));
+  }
+
+  public void writeObjectDelta(
+      String database, int regionId, long timePartition, String table, long 
size, int num) {
+    throw new UnsupportedOperationException("writeObjectDelta");
+  }
+
+  public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+    StartReadOperation operation =
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(0, null, 0, null, 
dataRegion.getDataRegionId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
+    return operation.future;
+  }
+
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
+    addOperationToQueue(operation);
+  }
+
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!region.isTableModel()) {
+      return;
+    }
+    addOperationToQueue(operation);
+  }
+
+  public void remove(String database, int regionId) {
+    RemoveRegionOperation operation = new RemoveRegionOperation(database, 
regionId);
+    if (!addOperationToQueue(operation)) {
+      return;
+    }
+    try {
+      operation.future.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
+    }
+  }
+
+  protected boolean addOperationToQueue(Operation operation) {
+    if (failedToRecover || stop) {
+      return false;
+    }
+    try {
+      queue.put(operation);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    return true;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public void close() {
+    if (scheduledExecutorService == null) {
+      return;
+    }
+    try {
+      stop = true;
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+      writerMap.clear();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @TestOnly
+  public void ensureRunning() {
+    stop = false;
+    failedToRecover = false;
+    if (scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.FILE_TIME_INDEX_RECORD.getName());
+      scheduledExecutorService.submit(this::run);
+    }
+  }
+
+  protected DataRegionTableSizeCacheWriter createWriter(
+      String database, int regionId, DataRegion region) {
+    return new DataRegionTableSizeCacheWriter(database, regionId, region);
+  }
+
+  protected TsFileTableSizeCacheReader createTsFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    TsFileTableDiskUsageCacheWriter tsFileCacheWriter = 
dataRegionWriter.tsFileCacheWriter;
+    return new TsFileTableSizeCacheReader(
+        tsFileCacheWriter.keyFileLength(),
+        tsFileCacheWriter.getKeyFile(),
+        tsFileCacheWriter.valueFileLength(),
+        tsFileCacheWriter.getValueFile(),
+        regionId);
+  }
+
+  protected IObjectTableSizeCacheReader createObjectFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    return new EmptyObjectTableSizeCacheReader();
+  }
+
+  protected abstract static class Operation {
+    protected final String database;
+    protected final int regionId;
+
+    protected Operation(String database, int regionId) {
+      this.database = database;
+      this.regionId = regionId;
+    }
+
+    public int getRegionId() {
+      return regionId;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public abstract void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException;
+  }
+
+  protected static class StartReadOperation extends Operation {
+    protected final DataRegion region;
+    protected final boolean readTsFileCache;
+    protected final boolean readObjectFileCache;
+    public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =

Review Comment:
   ```suggestion
       private final CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();
+    compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+    processedOperationCountSinceLastPeriodicCheck = 0;
+  }
+
+  /**
+   * Any unrecoverable error in a single writer will mark the whole 
TableDiskUsageCache as failed
+   * and disable further operations.
+   */
+  protected void failedToRecover(Exception e) {
+    failedToRecover = true;
+    LOGGER.error("Failed to recover TableDiskUsageCache", e);
+  }
+
+  protected void 
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+    try {
+      writer.tsFileCacheWriter.syncIfNecessary();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to sync tsfile table size cache.", e);
+    }
+  }
+
+  // Hook for subclasses to persist pending object table size deltas. No-op by 
default.
+  protected void 
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+  protected void compactIfNecessary(long maxRunTime) {
+    if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      if (System.currentTimeMillis() - startTime > maxRunTime) {
+        break;
+      }
+      if (writer.getActiveReaderNum() > 0) {
+        continue;
+      }
+      writer.compactIfNecessary();
+    }
+  }
+
+  protected void checkAndMayCloseIdleWriter() {
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      writer.closeIfIdle();
+    }
+  }
+
+  public void write(String database, TsFileID tsFileID, Map<String, Long> 
tableSizeMap) {
+    if (tableSizeMap == null || tableSizeMap.isEmpty()) {
+      // tree model
+      return;
+    }
+    addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
+  }
+
+  public void write(String database, TsFileID originTsFileID, TsFileID 
newTsFileID) {
+    addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, 
newTsFileID));
+  }
+
+  public void writeObjectDelta(
+      String database, int regionId, long timePartition, String table, long 
size, int num) {
+    throw new UnsupportedOperationException("writeObjectDelta");
+  }
+
+  public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+    StartReadOperation operation =
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(0, null, 0, null, 
dataRegion.getDataRegionId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
+    return operation.future;
+  }
+
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
+    addOperationToQueue(operation);
+  }
+
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!region.isTableModel()) {
+      return;
+    }
+    addOperationToQueue(operation);
+  }
+
+  public void remove(String database, int regionId) {
+    RemoveRegionOperation operation = new RemoveRegionOperation(database, 
regionId);
+    if (!addOperationToQueue(operation)) {
+      return;
+    }
+    try {
+      operation.future.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
+    }
+  }
+
+  protected boolean addOperationToQueue(Operation operation) {
+    if (failedToRecover || stop) {
+      return false;
+    }
+    try {
+      queue.put(operation);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    return true;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public void close() {
+    if (scheduledExecutorService == null) {
+      return;
+    }
+    try {
+      stop = true;
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+      writerMap.clear();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @TestOnly
+  public void ensureRunning() {
+    stop = false;
+    failedToRecover = false;
+    if (scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.FILE_TIME_INDEX_RECORD.getName());
+      scheduledExecutorService.submit(this::run);
+    }
+  }
+
+  protected DataRegionTableSizeCacheWriter createWriter(
+      String database, int regionId, DataRegion region) {
+    return new DataRegionTableSizeCacheWriter(database, regionId, region);
+  }
+
+  protected TsFileTableSizeCacheReader createTsFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    TsFileTableDiskUsageCacheWriter tsFileCacheWriter = 
dataRegionWriter.tsFileCacheWriter;
+    return new TsFileTableSizeCacheReader(
+        tsFileCacheWriter.keyFileLength(),
+        tsFileCacheWriter.getKeyFile(),
+        tsFileCacheWriter.valueFileLength(),
+        tsFileCacheWriter.getValueFile(),
+        regionId);
+  }
+
+  protected IObjectTableSizeCacheReader createObjectFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    return new EmptyObjectTableSizeCacheReader();
+  }
+
+  protected abstract static class Operation {
+    protected final String database;
+    protected final int regionId;
+
+    protected Operation(String database, int regionId) {
+      this.database = database;
+      this.regionId = regionId;
+    }
+
+    public int getRegionId() {
+      return regionId;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public abstract void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException;
+  }
+
+  protected static class StartReadOperation extends Operation {
+    protected final DataRegion region;
+    protected final boolean readTsFileCache;
+    protected final boolean readObjectFileCache;
+    public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =
+        new CompletableFuture<>();
+
+    public StartReadOperation(
+        DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+      this.readTsFileCache = readTsFileCache;
+      this.readObjectFileCache = readObjectFileCache;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
+      try {
+        if (writer == null || writer.getRemovedFuture() != null) {
+          // region is removing or removed
+          future.complete(
+              new Pair<>(
+                  new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
+                  new EmptyObjectTableSizeCacheReader()));
+          return;
+        }
+        writer.increaseActiveReaderNum();
+        // Flush buffered writes to ensure readers observe a consistent 
snapshot
+        writer.flush();
+        TsFileTableSizeCacheReader tsFileTableSizeCacheReader =
+            readTsFileCache ? 
tableDiskUsageCache.createTsFileCacheReader(writer, regionId) : null;
+        IObjectTableSizeCacheReader objectTableSizeCacheReader =
+            readObjectFileCache
+                ? tableDiskUsageCache.createObjectFileCacheReader(writer, 
regionId)
+                : null;
+        future.complete(new Pair<>(tsFileTableSizeCacheReader, 
objectTableSizeCacheReader));
+      } catch (Throwable t) {
+        future.completeExceptionally(t);
+      }
+    }
+  }
+
+  private static class EndReadOperation extends Operation {
+    protected final DataRegion region;
+
+    public EndReadOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      tableDiskUsageCache.writerMap.computeIfPresent(
+          regionId,
+          (k, writer) -> {
+            if (writer.dataRegion != region) {
+              return writer;
+            }
+            writer.decreaseActiveReaderNum();
+            // Complete pending remove when the last reader exits
+            if (writer.getRemovedFuture() != null) {
+              writer.close();
+              writer.getRemovedFuture().complete(null);
+              writer.setRemovedFuture(null);
+              return null;
+            }
+            return writer;
+          });
+    }
+  }
+
+  private static class WriteOperation extends Operation {
+
+    private final TsFileID tsFileID;
+    private final Map<String, Long> tableSizeMap;
+
+    protected WriteOperation(String database, TsFileID tsFileID, Map<String, 
Long> tableSizeMap) {
+      super(database, tsFileID.regionId);
+      this.tsFileID = tsFileID;
+      this.tableSizeMap = tableSizeMap;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter dataRegionTableSizeCacheWriter =
+          tableDiskUsageCache.writerMap.get(regionId);
+      if (dataRegionTableSizeCacheWriter != null) {
+        dataRegionTableSizeCacheWriter.tsFileCacheWriter.write(tsFileID, 
tableSizeMap);
+      }

Review Comment:
   else should print warn or error log? It's a critical bug



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.IObjectTableSizeCacheReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TableDiskUsageCache {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TableDiskUsageCache.class);
+  protected final BlockingQueue<Operation> queue = new 
LinkedBlockingQueue<>(1000);
+  // regionId -> writer mapping
+  protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
+  protected ScheduledExecutorService scheduledExecutorService;
+  private int processedOperationCountSinceLastPeriodicCheck = 0;
+  protected volatile boolean failedToRecover = false;
+  private volatile boolean stop = false;
+
+  protected TableDiskUsageCache() {
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.FILE_TIME_INDEX_RECORD.getName());
+    scheduledExecutorService.submit(this::run);
+  }
+
+  protected void run() {
+    try {
+      while (!stop) {
+        try {
+          for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+            syncTsFileTableSizeCacheIfNecessary(writer);
+            persistPendingObjectDeltasIfNecessary(writer);
+          }
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            processedOperationCountSinceLastPeriodicCheck++;
+          }
+          if (operation == null || 
processedOperationCountSinceLastPeriodicCheck % 1000 == 0) {
+            performPeriodicMaintenance();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          LOGGER.error("Meet exception when apply TableDiskUsageCache 
operation.", e);
+        }
+      }
+    } finally {
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+    }
+  }
+
+  private void performPeriodicMaintenance() {
+    checkAndMayCloseIdleWriter();
+    compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+    processedOperationCountSinceLastPeriodicCheck = 0;
+  }
+
+  /**
+   * Any unrecoverable error in a single writer will mark the whole 
TableDiskUsageCache as failed
+   * and disable further operations.
+   */
+  protected void failedToRecover(Exception e) {
+    failedToRecover = true;
+    LOGGER.error("Failed to recover TableDiskUsageCache", e);
+  }
+
+  protected void 
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+    try {
+      writer.tsFileCacheWriter.syncIfNecessary();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to sync tsfile table size cache.", e);
+    }
+  }
+
+  // Hook for subclasses to persist pending object table size deltas. No-op by 
default.
+  protected void 
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+  protected void compactIfNecessary(long maxRunTime) {
+    if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      if (System.currentTimeMillis() - startTime > maxRunTime) {
+        break;
+      }
+      if (writer.getActiveReaderNum() > 0) {
+        continue;
+      }
+      writer.compactIfNecessary();
+    }
+  }
+
+  protected void checkAndMayCloseIdleWriter() {
+    for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+      writer.closeIfIdle();
+    }
+  }
+
+  public void write(String database, TsFileID tsFileID, Map<String, Long> 
tableSizeMap) {
+    if (tableSizeMap == null || tableSizeMap.isEmpty()) {
+      // tree model
+      return;
+    }
+    addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
+  }
+
+  public void write(String database, TsFileID originTsFileID, TsFileID 
newTsFileID) {
+    addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, 
newTsFileID));
+  }
+
+  public void writeObjectDelta(
+      String database, int regionId, long timePartition, String table, long 
size, int num) {
+    throw new UnsupportedOperationException("writeObjectDelta");
+  }
+
+  public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
+      DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+    StartReadOperation operation =
+        new StartReadOperation(dataRegion, readTsFileCache, 
readObjectFileCache);
+    if (!addOperationToQueue(operation)) {
+      operation.future.complete(
+          new Pair<>(
+              new TsFileTableSizeCacheReader(0, null, 0, null, 
dataRegion.getDataRegionId()),
+              new EmptyObjectTableSizeCacheReader()));
+    }
+    return operation.future;
+  }
+
+  public void endRead(DataRegion dataRegion) {
+    EndReadOperation operation = new EndReadOperation(dataRegion);
+    addOperationToQueue(operation);
+  }
+
+  public void registerRegion(DataRegion region) {
+    RegisterRegionOperation operation = new RegisterRegionOperation(region);
+    if (!region.isTableModel()) {
+      return;
+    }
+    addOperationToQueue(operation);
+  }
+
+  public void remove(String database, int regionId) {
+    RemoveRegionOperation operation = new RemoveRegionOperation(database, 
regionId);
+    if (!addOperationToQueue(operation)) {
+      return;
+    }
+    try {
+      operation.future.get(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
+    }
+  }
+
+  protected boolean addOperationToQueue(Operation operation) {
+    if (failedToRecover || stop) {
+      return false;
+    }
+    try {
+      queue.put(operation);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    return true;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public void close() {
+    if (scheduledExecutorService == null) {
+      return;
+    }
+    try {
+      stop = true;
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+      writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+      writerMap.clear();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @TestOnly
+  public void ensureRunning() {
+    stop = false;
+    failedToRecover = false;
+    if (scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.FILE_TIME_INDEX_RECORD.getName());
+      scheduledExecutorService.submit(this::run);
+    }
+  }
+
+  protected DataRegionTableSizeCacheWriter createWriter(
+      String database, int regionId, DataRegion region) {
+    return new DataRegionTableSizeCacheWriter(database, regionId, region);
+  }
+
+  protected TsFileTableSizeCacheReader createTsFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    TsFileTableDiskUsageCacheWriter tsFileCacheWriter = 
dataRegionWriter.tsFileCacheWriter;
+    return new TsFileTableSizeCacheReader(
+        tsFileCacheWriter.keyFileLength(),
+        tsFileCacheWriter.getKeyFile(),
+        tsFileCacheWriter.valueFileLength(),
+        tsFileCacheWriter.getValueFile(),
+        regionId);
+  }
+
+  protected IObjectTableSizeCacheReader createObjectFileCacheReader(
+      DataRegionTableSizeCacheWriter dataRegionWriter, int regionId) {
+    return new EmptyObjectTableSizeCacheReader();
+  }
+
+  protected abstract static class Operation {
+    protected final String database;
+    protected final int regionId;
+
+    protected Operation(String database, int regionId) {
+      this.database = database;
+      this.regionId = regionId;
+    }
+
+    public int getRegionId() {
+      return regionId;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+
+    public abstract void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException;
+  }
+
+  protected static class StartReadOperation extends Operation {
+    protected final DataRegion region;
+    protected final boolean readTsFileCache;
+    protected final boolean readObjectFileCache;
+    public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> future =
+        new CompletableFuture<>();
+
+    public StartReadOperation(
+        DataRegion dataRegion, boolean readTsFileCache, boolean 
readObjectFileCache) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+      this.readTsFileCache = readTsFileCache;
+      this.readObjectFileCache = readObjectFileCache;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
+      try {
+        if (writer == null || writer.getRemovedFuture() != null) {
+          // region is removing or removed
+          future.complete(
+              new Pair<>(
+                  new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
+                  new EmptyObjectTableSizeCacheReader()));
+          return;
+        }
+        writer.increaseActiveReaderNum();
+        // Flush buffered writes to ensure readers observe a consistent 
snapshot
+        writer.flush();
+        TsFileTableSizeCacheReader tsFileTableSizeCacheReader =
+            readTsFileCache ? 
tableDiskUsageCache.createTsFileCacheReader(writer, regionId) : null;
+        IObjectTableSizeCacheReader objectTableSizeCacheReader =
+            readObjectFileCache
+                ? tableDiskUsageCache.createObjectFileCacheReader(writer, 
regionId)
+                : null;
+        future.complete(new Pair<>(tsFileTableSizeCacheReader, 
objectTableSizeCacheReader));
+      } catch (Throwable t) {
+        future.completeExceptionally(t);
+      }
+    }
+  }
+
+  private static class EndReadOperation extends Operation {
+    protected final DataRegion region;
+
+    public EndReadOperation(DataRegion dataRegion) {
+      super(dataRegion.getDatabaseName(), dataRegion.getDataRegionId());
+      this.region = dataRegion;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      tableDiskUsageCache.writerMap.computeIfPresent(
+          regionId,
+          (k, writer) -> {
+            if (writer.dataRegion != region) {
+              return writer;
+            }
+            writer.decreaseActiveReaderNum();
+            // Complete pending remove when the last reader exits
+            if (writer.getRemovedFuture() != null) {
+              writer.close();
+              writer.getRemovedFuture().complete(null);
+              writer.setRemovedFuture(null);
+              return null;
+            }
+            return writer;
+          });
+    }
+  }
+
+  private static class WriteOperation extends Operation {
+
+    private final TsFileID tsFileID;
+    private final Map<String, Long> tableSizeMap;
+
+    protected WriteOperation(String database, TsFileID tsFileID, Map<String, 
Long> tableSizeMap) {
+      super(database, tsFileID.regionId);
+      this.tsFileID = tsFileID;
+      this.tableSizeMap = tableSizeMap;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter dataRegionTableSizeCacheWriter =
+          tableDiskUsageCache.writerMap.get(regionId);
+      if (dataRegionTableSizeCacheWriter != null) {
+        dataRegionTableSizeCacheWriter.tsFileCacheWriter.write(tsFileID, 
tableSizeMap);
+      }
+    }
+  }
+
+  private static class ReplaceTsFileOperation extends Operation {
+    private final TsFileID originTsFileID;
+    private final TsFileID newTsFileID;
+
+    public ReplaceTsFileOperation(String database, TsFileID originTsFileID, 
TsFileID newTsFileID) {
+      super(database, originTsFileID.regionId);
+      this.originTsFileID = originTsFileID;
+      this.newTsFileID = newTsFileID;
+    }
+
+    @Override
+    public void apply(TableDiskUsageCache tableDiskUsageCache) throws 
IOException {
+      DataRegionTableSizeCacheWriter writer = 
tableDiskUsageCache.writerMap.get(regionId);
+      if (writer != null) {
+        writer.tsFileCacheWriter.write(originTsFileID, newTsFileID);
+      }

Review Comment:
   else should print warn or error log? It's a critical bug



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to