Copilot commented on code in PR #17203:
URL: https://github.com/apache/iotdb/pull/17203#discussion_r2802105595


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -2384,4 +2390,12 @@ private void logFlushQueryReadUnlocked() {
   public String toString() {
     return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}';
   }
+
+  public Future<?> getCloseFuture() {
+    return closeFuture;
+  }
+

Review Comment:
   The public methods getCloseFuture() and setCloseFuture() lack documentation. 
They should have JavaDoc comments explaining their purpose, particularly the 
thread-safety guarantees and lifecycle of the closeFuture field. Additionally, 
the setCloseFuture() method appears to be unused in the codebase - if it's 
intended for future use or testing, this should be documented, otherwise it 
should be removed or made package-private.
   ```suggestion
   
     /**
      * Returns the {@link Future} that represents the asynchronous close 
operation of this
      * {@link TsFileProcessor}.
      *
      * <p>The returned future is typically completed when all pending writes, 
flushes, and resource
      * releases associated with this processor have finished. Callers may use 
this future to wait for
      * the processor to be fully closed or to chain follow-up actions.
      *
      * <p>Thread-safety: this method performs no additional synchronization. 
The {@code closeFuture}
      * reference may be updated concurrently by other threads, and this method 
may return {@code null}
      * if a close operation has not yet been scheduled. Callers are 
responsible for ensuring an
      * appropriate happens-before relationship if they require stronger 
guarantees.
      *
      * @return the future that completes when this processor is closed, or 
{@code null} if no close
      *     operation has been registered yet
      */
     public Future<?> getCloseFuture() {
       return closeFuture;
     }
   
     /**
      * Sets the {@link Future} that tracks completion of this processor's 
close operation.
      *
      * <p>This method is intended to be invoked by code that manages the 
lifecycle of
      * {@link TsFileProcessor} instances (for example, when scheduling an 
asynchronous close) and in
      * tests where a custom {@code Future} is injected to observe or control 
the close lifecycle.
      *
      * <p>Thread-safety: this method does not perform synchronization. If 
multiple threads may update
      * or read {@code closeFuture}, callers must provide any necessary 
external synchronization or
      * rely on higher-level concurrency guarantees.
      *
      * @param closeFuture the future representing the close operation for this 
processor; may be
      *     {@code null} to indicate that no close operation is currently 
registered
      */
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long 
version, int mergeCnt, int unseq
     return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, 
unseqCompactionCnt);
   }
 
+  public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) 
{
+    writeLock("asyncCloseOneTsFileProcessor");
+    try {
+      return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), 
tsFileResource.getProcessor());
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    * close one tsfile processor, thread-safety should be ensured by caller
    *
    * @param sequence whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */
   public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, 
TsFileProcessor tsFileProcessor) {
-    // for sequence tsfile, we update the endTimeMap only when the file is 
prepared to be closed.
-    // for unsequence tsfile, we have maintained the endTimeMap when an 
insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
-        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
-        || tsFileProcessor.alreadyMarkedClosing()) {
+    if (tsFileProcessor == null) {
       return CompletableFuture.completedFuture(null);
     }
-    Future<?> future;
-    if (sequence) {
-      closingSequenceTsFileProcessor.add(tsFileProcessor);
-      future = tsFileProcessor.asyncClose();
-      if (future.isDone()) {
-        closingSequenceTsFileProcessor.remove(tsFileProcessor);
-      }
+    if (tsFileProcessor.getCloseFuture() != null) {
+      return tsFileProcessor.getCloseFuture();
+    }
 
-      workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
-    } else {
-      closingUnSequenceTsFileProcessor.add(tsFileProcessor);
-      future = tsFileProcessor.asyncClose();
-      if (future.isDone()) {
-        closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
-      }
+    Future<?> future;
+    Set<TsFileProcessor> closingTsFileProcessors =
+        sequence ? closingSequenceTsFileProcessor : 
closingUnSequenceTsFileProcessor;
+    TreeMap<Long, TsFileProcessor> workTsFileProcessors =
+        sequence ? workSequenceTsFileProcessors : 
workUnsequenceTsFileProcessors;
 
-      workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+    closingTsFileProcessors.add(tsFileProcessor);
+    future = tsFileProcessor.asyncClose();
+    if (future.isDone()) {
+      closingTsFileProcessors.remove(tsFileProcessor);
     }
+    workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());

Review Comment:
   The removal of the check for whether the processor is already in the 
closingSequenceTsFileProcessor or closingUnSequenceTsFileProcessor sets may 
cause issues. In the old code, if a processor was already in these sets, the 
method would return early. Now, the only check is whether closeFuture is set. 
However, there's a window between when a processor is added to 
closingTsFileProcessors (line 1971) and when closeFuture is set inside 
asyncClose(). During this window, a second call to asyncCloseOneTsFileProcessor 
could pass the closeFuture null check and add the processor to the closing set 
again.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -210,6 +210,8 @@ public class TsFileProcessor {
 
   private int walEntryNum = 0;
 
+  private Future<?> closeFuture;

Review Comment:
   The closeFuture field should be declared as volatile to ensure visibility 
across threads. Without volatile, one thread reading closeFuture may not see 
the value set by another thread, leading to potential race conditions where 
multiple threads could simultaneously pass the null check in asyncClose() and 
create multiple futures.
   ```suggestion
     private volatile Future<?> closeFuture;
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -210,6 +210,8 @@ public class TsFileProcessor {
 
   private int walEntryNum = 0;
 
+  private Future<?> closeFuture;

Review Comment:
   The closeFuture field is never cleared after the processor is closed, which 
means it holds a reference to the Future object indefinitely. This could 
potentially cause a memory leak if the Future holds references to large objects 
or contexts. Consider setting closeFuture to null in the endFile() and 
endEmptyFile() methods after the file is successfully closed, or document why 
this reference needs to be retained.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long 
version, int mergeCnt, int unseq
     return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, 
unseqCompactionCnt);
   }
 
+  public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) 
{
+    writeLock("asyncCloseOneTsFileProcessor");
+    try {
+      return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), 
tsFileResource.getProcessor());
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    * close one tsfile processor, thread-safety should be ensured by caller
    *
    * @param sequence whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */

Review Comment:
   The JavaDoc comment on line 1952 states "thread-safety should be ensured by 
caller", which is accurate for this method. However, the new overload 
asyncCloseOneTsFileProcessor(TsFileResource) at line 1942 acquires its own 
write lock, making it thread-safe. The documentation should clarify which 
overload requires external synchronization and which provides its own.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long 
version, int mergeCnt, int unseq
     return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, 
unseqCompactionCnt);
   }
 
+  public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) 
{
+    writeLock("asyncCloseOneTsFileProcessor");
+    try {
+      return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), 
tsFileResource.getProcessor());
+    } finally {
+      writeUnlock();
+    }
+  }

Review Comment:
   The new public method asyncCloseOneTsFileProcessor(TsFileResource) lacks 
documentation. It should have a JavaDoc comment explaining its purpose, 
parameters, return value, and the idempotency guarantee mentioned in the PR 
description (i.e., that multiple calls with the same TsFileResource will return 
the same Future).



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java:
##########
@@ -93,9 +93,13 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertTrue;

Review Comment:
   The changes use static import for assertTrue (line 102) while the rest of 
the file consistently uses Assert.assertEquals, Assert.assertFalse, etc. 
without static imports. For consistency, either all assertions should use 
static imports, or none should. The current mixed approach where only 
assertTrue is statically imported while other assertions like 
Assert.assertEquals remain prefixed creates inconsistency.
   ```suggestion
   
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long 
version, int mergeCnt, int unseq
     return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, 
unseqCompactionCnt);
   }
 
+  public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) 
{
+    writeLock("asyncCloseOneTsFileProcessor");
+    try {
+      return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), 
tsFileResource.getProcessor());
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    * close one tsfile processor, thread-safety should be ensured by caller
    *
    * @param sequence whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */
   public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, 
TsFileProcessor tsFileProcessor) {
-    // for sequence tsfile, we update the endTimeMap only when the file is 
prepared to be closed.
-    // for unsequence tsfile, we have maintained the endTimeMap when an 
insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
-        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
-        || tsFileProcessor.alreadyMarkedClosing()) {
+    if (tsFileProcessor == null) {
       return CompletableFuture.completedFuture(null);
     }
-    Future<?> future;
-    if (sequence) {
-      closingSequenceTsFileProcessor.add(tsFileProcessor);
-      future = tsFileProcessor.asyncClose();
-      if (future.isDone()) {
-        closingSequenceTsFileProcessor.remove(tsFileProcessor);
-      }
+    if (tsFileProcessor.getCloseFuture() != null) {
+      return tsFileProcessor.getCloseFuture();
+    }

Review Comment:
   There's a race condition here. The check for closeFuture at line 1961 
happens outside of the TsFileProcessor's flushQueryLock, while the closeFuture 
is set inside that lock in TsFileProcessor.asyncClose(). This means a thread 
could read null from getCloseFuture() here, then another thread could complete 
asyncClose() and set closeFuture, and then the first thread would proceed to 
call tsFileProcessor.asyncClose() again, potentially creating a second future. 
Since closeFuture is not volatile, visibility is also not guaranteed.
   ```suggestion
   
   ```



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java:
##########
@@ -1772,4 +1776,21 @@ public void 
testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles()
     Assert.assertFalse(tsFileResourceSeq.anyModFileExists());
     Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
   }
+
+  @Test
+  public void testFlushSpecifiedResource()
+      throws IllegalPathException, WriteProcessException, ExecutionException, 
InterruptedException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(deviceId, j);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+    TsFileResource tsFileResourceSeq = 
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+    Future<?> future = 
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+    Future<?> future2 = 
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+    assertTrue(future == future2 || future2 instanceof CompletableFuture);

Review Comment:
   The test assertion logic is unclear and potentially incorrect. The condition 
"future == future2 || future2 instanceof CompletableFuture" seems to expect 
either the same future to be returned (future == future2) OR for future2 to be 
a CompletableFuture. However, the PR description states "flushing the same 
TsFile multiple times will result in the same future" which suggests future == 
future2 should always be true. The alternative condition "future2 instanceof 
CompletableFuture" would allow the test to pass even when different futures are 
returned, which defeats the purpose of testing idempotency. This should be 
simplified to just assertTrue(future == future2).
   ```suggestion
       assertTrue(future == future2);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to