Copilot commented on code in PR #17203:
URL: https://github.com/apache/iotdb/pull/17203#discussion_r2802105595
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -2384,4 +2390,12 @@ private void logFlushQueryReadUnlocked() {
public String toString() {
return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}';
}
+
+ public Future<?> getCloseFuture() {
+ return closeFuture;
+ }
+
Review Comment:
The public methods getCloseFuture() and setCloseFuture() lack documentation.
They should have JavaDoc comments explaining their purpose, particularly the
thread-safety guarantees and lifecycle of the closeFuture field. Additionally,
the setCloseFuture() method appears to be unused in the codebase - if it's
intended for future use or testing, this should be documented, otherwise it
should be removed or made package-private.
```suggestion
/**
* Returns the {@link Future} that represents the asynchronous close
operation of this
* {@link TsFileProcessor}.
*
* <p>The returned future is typically completed when all pending writes,
flushes, and resource
* releases associated with this processor have finished. Callers may use
this future to wait for
* the processor to be fully closed or to chain follow-up actions.
*
* <p>Thread-safety: this method performs no additional synchronization.
The {@code closeFuture}
* reference may be updated concurrently by other threads, and this method
may return {@code null}
* if a close operation has not yet been scheduled. Callers are
responsible for ensuring an
* appropriate happens-before relationship if they require stronger
guarantees.
*
* @return the future that completes when this processor is closed, or
{@code null} if no close
* operation has been registered yet
*/
public Future<?> getCloseFuture() {
return closeFuture;
}
/**
* Sets the {@link Future} that tracks completion of this processor's
close operation.
*
* <p>This method is intended to be invoked by code that manages the
lifecycle of
* {@link TsFileProcessor} instances (for example, when scheduling an
asynchronous close) and in
* tests where a custom {@code Future} is injected to observe or control
the close lifecycle.
*
* <p>Thread-safety: this method does not perform synchronization. If
multiple threads may update
* or read {@code closeFuture}, callers must provide any necessary
external synchronization or
* rely on higher-level concurrency guarantees.
*
* @param closeFuture the future representing the close operation for this
processor; may be
* {@code null} to indicate that no close operation is currently
registered
*/
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long
version, int mergeCnt, int unseq
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt,
unseqCompactionCnt);
}
+ public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource)
{
+ writeLock("asyncCloseOneTsFileProcessor");
+ try {
+ return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(),
tsFileResource.getProcessor());
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
* @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence,
TsFileProcessor tsFileProcessor) {
- // for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
- // for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
- || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
- || tsFileProcessor.alreadyMarkedClosing()) {
+ if (tsFileProcessor == null) {
return CompletableFuture.completedFuture(null);
}
- Future<?> future;
- if (sequence) {
- closingSequenceTsFileProcessor.add(tsFileProcessor);
- future = tsFileProcessor.asyncClose();
- if (future.isDone()) {
- closingSequenceTsFileProcessor.remove(tsFileProcessor);
- }
+ if (tsFileProcessor.getCloseFuture() != null) {
+ return tsFileProcessor.getCloseFuture();
+ }
- workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
- } else {
- closingUnSequenceTsFileProcessor.add(tsFileProcessor);
- future = tsFileProcessor.asyncClose();
- if (future.isDone()) {
- closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
- }
+ Future<?> future;
+ Set<TsFileProcessor> closingTsFileProcessors =
+ sequence ? closingSequenceTsFileProcessor :
closingUnSequenceTsFileProcessor;
+ TreeMap<Long, TsFileProcessor> workTsFileProcessors =
+ sequence ? workSequenceTsFileProcessors :
workUnsequenceTsFileProcessors;
- workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+ closingTsFileProcessors.add(tsFileProcessor);
+ future = tsFileProcessor.asyncClose();
+ if (future.isDone()) {
+ closingTsFileProcessors.remove(tsFileProcessor);
}
+ workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
Review Comment:
The removal of the check for whether the processor is already in the
closingSequenceTsFileProcessor or closingUnSequenceTsFileProcessor sets may
cause issues. In the old code, if a processor was already in these sets, the
method would return early. Now, the only check is whether closeFuture is set.
However, there's a window between when a processor is added to
closingTsFileProcessors (line 1971) and when closeFuture is set inside
asyncClose(). During this window, a second call to asyncCloseOneTsFileProcessor
could pass the closeFuture null check and add the processor to the closing set
again.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -210,6 +210,8 @@ public class TsFileProcessor {
private int walEntryNum = 0;
+ private Future<?> closeFuture;
Review Comment:
The closeFuture field should be declared as volatile to ensure visibility
across threads. Without volatile, one thread reading closeFuture may not see
the value set by another thread, leading to potential race conditions where
multiple threads could simultaneously pass the null check in asyncClose() and
create multiple futures.
```suggestion
private volatile Future<?> closeFuture;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -210,6 +210,8 @@ public class TsFileProcessor {
private int walEntryNum = 0;
+ private Future<?> closeFuture;
Review Comment:
The closeFuture field is never cleared after the processor is closed, which
means it holds a reference to the Future object indefinitely. This could
potentially cause a memory leak if the Future holds references to large objects
or contexts. Consider setting closeFuture to null in the endFile() and
endEmptyFile() methods after the file is successfully closed, or document why
this reference needs to be retained.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long
version, int mergeCnt, int unseq
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt,
unseqCompactionCnt);
}
+ public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource)
{
+ writeLock("asyncCloseOneTsFileProcessor");
+ try {
+ return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(),
tsFileResource.getProcessor());
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
* @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
Review Comment:
The JavaDoc comment on line 1952 states "thread-safety should be ensured by
caller", which is accurate for this method. However, the new overload
asyncCloseOneTsFileProcessor(TsFileResource) at line 1942 acquires its own
write lock, making it thread-safe. The documentation should clarify which
overload requires external synchronization and which provides its own.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long
version, int mergeCnt, int unseq
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt,
unseqCompactionCnt);
}
+ public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource)
{
+ writeLock("asyncCloseOneTsFileProcessor");
+ try {
+ return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(),
tsFileResource.getProcessor());
+ } finally {
+ writeUnlock();
+ }
+ }
Review Comment:
The new public method asyncCloseOneTsFileProcessor(TsFileResource) lacks
documentation. It should have a JavaDoc comment explaining its purpose,
parameters, return value, and the idempotency guarantee mentioned in the PR
description (i.e., that multiple calls with the same TsFileResource will return
the same Future).
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java:
##########
@@ -93,9 +93,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertTrue;
Review Comment:
The changes use static import for assertTrue (line 102) while the rest of
the file consistently uses Assert.assertEquals, Assert.assertFalse, etc.
without static imports. For consistency, either all assertions should use
static imports, or none should. The current mixed approach where only
assertTrue is statically imported while other assertions like
Assert.assertEquals remain prefixed creates inconsistency.
```suggestion
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1939,38 +1939,42 @@ private String getNewTsFileName(long time, long
version, int mergeCnt, int unseq
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt,
unseqCompactionCnt);
}
+ public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource)
{
+ writeLock("asyncCloseOneTsFileProcessor");
+ try {
+ return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(),
tsFileResource.getProcessor());
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
* @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence,
TsFileProcessor tsFileProcessor) {
- // for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
- // for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
- || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
- || tsFileProcessor.alreadyMarkedClosing()) {
+ if (tsFileProcessor == null) {
return CompletableFuture.completedFuture(null);
}
- Future<?> future;
- if (sequence) {
- closingSequenceTsFileProcessor.add(tsFileProcessor);
- future = tsFileProcessor.asyncClose();
- if (future.isDone()) {
- closingSequenceTsFileProcessor.remove(tsFileProcessor);
- }
+ if (tsFileProcessor.getCloseFuture() != null) {
+ return tsFileProcessor.getCloseFuture();
+ }
Review Comment:
There's a race condition here. The check for closeFuture at line 1961
happens outside of the TsFileProcessor's flushQueryLock, while the closeFuture
is set inside that lock in TsFileProcessor.asyncClose(). This means a thread
could read null from getCloseFuture() here, then another thread could complete
asyncClose() and set closeFuture, and then the first thread would proceed to
call tsFileProcessor.asyncClose() again, potentially creating a second future.
Since closeFuture is not volatile, visibility is also not guaranteed.
```suggestion
```
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java:
##########
@@ -1772,4 +1776,21 @@ public void
testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles()
Assert.assertFalse(tsFileResourceSeq.anyModFileExists());
Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
}
+
+ @Test
+ public void testFlushSpecifiedResource()
+ throws IllegalPathException, WriteProcessException, ExecutionException,
InterruptedException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(deviceId, j);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+ TsFileResource tsFileResourceSeq =
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+ Future<?> future =
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+ Future<?> future2 =
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+ assertTrue(future == future2 || future2 instanceof CompletableFuture);
Review Comment:
The test assertion logic is unclear and potentially incorrect. The condition
"future == future2 || future2 instanceof CompletableFuture" seems to expect
either the same future to be returned (future == future2) OR for future2 to be
a CompletableFuture. However, the PR description states "flushing the same
TsFile multiple times will result in the same future" which suggests future ==
future2 should always be true. The alternative condition "future2 instanceof
CompletableFuture" would allow the test to pass even when different futures are
returned, which defeats the purpose of testing idempotency. This should be
simplified to just assertTrue(future == future2).
```suggestion
assertTrue(future == future2);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]