Copilot commented on code in PR #16209:
URL: https://github.com/apache/iotdb/pull/16209#discussion_r2290049957


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java:
##########
@@ -90,20 +96,30 @@ private void initialize() throws QueryProcessException {
           // for some reasons, we may get null QueryDataSource here.
           // And it's safe for us to throw this exception here in such case.
           throw new IllegalStateException("QueryDataSource should never be 
null!");
+        } else if (dataSource == UNFINISHED_QUERY_DATA_SOURCE) {

Review Comment:
   Using reference equality (==) to compare with UNFINISHED_QUERY_DATA_SOURCE 
is fragile. Consider using a method like `dataSource.isUnfinished()` or 
`Objects.equals()` for safer comparison.
   ```suggestion
           } else if (Objects.equals(dataSource, UNFINISHED_QUERY_DATA_SOURCE)) 
{
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -2070,72 +2065,212 @@ public void forceCloseAllWorkingTsFileProcessors() 
throws TsFileProcessorExcepti
     }
   }
 
-  /** used for query engine */
   @Override
+  @TestOnly
   public QueryDataSource query(
       List<IFullPath> pathList,
       IDeviceID singleDeviceId,
       QueryContext context,
       Filter globalTimeFilter,
       List<Long> timePartitions)
       throws QueryProcessException {
-    try {
-      List<TsFileResource> seqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(true, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
-              globalTimeFilter,
-              true);
-      List<TsFileResource> unseqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
-              globalTimeFilter,
-              false);
+    return query(
+        pathList, singleDeviceId, context, globalTimeFilter, timePartitions, 
Long.MAX_VALUE);
+  }
+
+  /** used for query engine */
+  @Override
+  public QueryDataSource query(
+      List<IFullPath> pathList,
+      IDeviceID singleDeviceId,
+      QueryContext context,
+      Filter globalTimeFilter,
+      List<Long> timePartitions,
+      long waitForLockTimeInMs)
+      throws QueryProcessException {
+
+    Pair<List<TsFileResource>, List<TsFileResource>> pair =
+        tsFileManager.getAllTsFileListForQuery(timePartitions, 
globalTimeFilter);
+
+    List<TsFileResource> seqTsFileResouceList = pair.left;
+    List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+    List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+    boolean success =
+        tryGetFLushLock(
+            waitForLockTimeInMs,
+            singleDeviceId,
+            globalTimeFilter,
+            context.isDebug(),
+            seqTsFileResouceList,
+            unSeqTsFileResouceList,
+            needToUnLockList);
+
+    if (success) {
+      try {
+        List<TsFileResource> satisfiedSeqResourceList =
+            getFileResourceListForQuery(
+                seqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, true);
+
+        List<TsFileResource> satisfiedUnSeqResourceList =
+            getFileResourceListForQuery(
+                unSeqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, false);
+
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            SEQUENCE_TSFILE, satisfiedSeqResourceList.size());
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            UNSEQUENCE_TSFILE, satisfiedUnSeqResourceList.size());
+        return new QueryDataSource(
+            satisfiedSeqResourceList, satisfiedUnSeqResourceList, 
databaseName);
+      } catch (MetadataException e) {
+        throw new QueryProcessException(e);
+      } finally {
+        clearAlreadyLockedList(needToUnLockList);
+      }
+    } else {
+      // means that failed to acquire lock within the specific time
+      return null;
+    }
+  }
+
+  /**
+   * try to get flush lock for each unclosed satisfied tsfile
+   *
+   * @return true if lock successfully, otherwise false if return false, 
needToUnLockList will
+   *     always be empty because this method is responsible for unlocking all 
the already-acquiring
+   *     lock if return true, the caller is responsible for unlocking all the 
already-acquiring lock
+   *     in needToUnLockList
+   */
+  private boolean tryGetFLushLock(
+      long waitTimeInMs,
+      IDeviceID singleDeviceId,
+      Filter globalTimeFilter,
+      boolean isDebug,
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unSeqResources,
+      List<TsFileProcessor> needToUnLockList) {
+    // deal with seq resources
+    for (TsFileResource tsFileResource : seqResources) {
+      // only need to acquire flush lock for those unclosed and satisfied 
tsfile
+      if (!tsFileResource.isClosed()
+          && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, 
true, isDebug)) {
+        TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+        try {
+          long startTime = System.nanoTime();
+          if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+            // minus already consumed time
+            waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
 
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, 
seqResources.size());
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, 
unseqResources.size());
+            needToUnLockList.add(tsFileProcessor);
 
-      return new QueryDataSource(seqResources, unseqResources, databaseName);
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+            // no remaining time slice
+            if (waitTimeInMs <= 0) {
+              clearAlreadyLockedList(needToUnLockList);
+              return false;
+            }
+          } else {
+            clearAlreadyLockedList(needToUnLockList);
+            return false;
+          }
+        } catch (InterruptedException e) {

Review Comment:
   The InterruptedException handling clears the needToUnLockList but doesn't 
restore the interrupt status. Consider calling 
`Thread.currentThread().interrupt()` before returning to preserve the interrupt 
state.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -2070,72 +2065,212 @@ public void forceCloseAllWorkingTsFileProcessors() 
throws TsFileProcessorExcepti
     }
   }
 
-  /** used for query engine */
   @Override
+  @TestOnly
   public QueryDataSource query(
       List<IFullPath> pathList,
       IDeviceID singleDeviceId,
       QueryContext context,
       Filter globalTimeFilter,
       List<Long> timePartitions)
       throws QueryProcessException {
-    try {
-      List<TsFileResource> seqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(true, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
-              globalTimeFilter,
-              true);
-      List<TsFileResource> unseqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
-              globalTimeFilter,
-              false);
+    return query(
+        pathList, singleDeviceId, context, globalTimeFilter, timePartitions, 
Long.MAX_VALUE);
+  }
+
+  /** used for query engine */
+  @Override
+  public QueryDataSource query(
+      List<IFullPath> pathList,
+      IDeviceID singleDeviceId,
+      QueryContext context,
+      Filter globalTimeFilter,
+      List<Long> timePartitions,
+      long waitForLockTimeInMs)
+      throws QueryProcessException {
+
+    Pair<List<TsFileResource>, List<TsFileResource>> pair =
+        tsFileManager.getAllTsFileListForQuery(timePartitions, 
globalTimeFilter);
+
+    List<TsFileResource> seqTsFileResouceList = pair.left;
+    List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+    List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+    boolean success =
+        tryGetFLushLock(
+            waitForLockTimeInMs,
+            singleDeviceId,
+            globalTimeFilter,
+            context.isDebug(),
+            seqTsFileResouceList,
+            unSeqTsFileResouceList,
+            needToUnLockList);
+
+    if (success) {
+      try {
+        List<TsFileResource> satisfiedSeqResourceList =
+            getFileResourceListForQuery(
+                seqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, true);
+
+        List<TsFileResource> satisfiedUnSeqResourceList =
+            getFileResourceListForQuery(
+                unSeqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, false);
+
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            SEQUENCE_TSFILE, satisfiedSeqResourceList.size());
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            UNSEQUENCE_TSFILE, satisfiedUnSeqResourceList.size());
+        return new QueryDataSource(
+            satisfiedSeqResourceList, satisfiedUnSeqResourceList, 
databaseName);
+      } catch (MetadataException e) {
+        throw new QueryProcessException(e);
+      } finally {
+        clearAlreadyLockedList(needToUnLockList);
+      }
+    } else {
+      // means that failed to acquire lock within the specific time
+      return null;
+    }
+  }
+
+  /**
+   * try to get flush lock for each unclosed satisfied tsfile
+   *
+   * @return true if lock successfully, otherwise false if return false, 
needToUnLockList will
+   *     always be empty because this method is responsible for unlocking all 
the already-acquiring
+   *     lock if return true, the caller is responsible for unlocking all the 
already-acquiring lock
+   *     in needToUnLockList
+   */
+  private boolean tryGetFLushLock(
+      long waitTimeInMs,
+      IDeviceID singleDeviceId,
+      Filter globalTimeFilter,
+      boolean isDebug,
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unSeqResources,
+      List<TsFileProcessor> needToUnLockList) {
+    // deal with seq resources
+    for (TsFileResource tsFileResource : seqResources) {
+      // only need to acquire flush lock for those unclosed and satisfied 
tsfile
+      if (!tsFileResource.isClosed()
+          && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, 
true, isDebug)) {
+        TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+        try {
+          long startTime = System.nanoTime();
+          if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+            // minus already consumed time
+            waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;

Review Comment:
   The wait time calculation could result in negative values if the lock 
acquisition takes longer than expected. This should be handled to prevent 
passing negative timeouts to subsequent operations.
   ```suggestion
               waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
               if (waitTimeInMs < 0) {
                 waitTimeInMs = 0;
               }
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java:
##########
@@ -586,100 +585,162 @@ public void initQueryDataSource(List<IFullPath> 
sourcePaths) throws QueryProcess
       }
     }
 
-    dataRegion.readLock();
-    try {
-
-      this.sharedQueryDataSource =
-          dataRegion.query(
-              sourcePaths,
-              // when all the selected series are under the same device, the 
QueryDataSource will be
-              // filtered according to timeIndex
-              singleDeviceId,
-              this,
-              // time filter may be stateful, so we need to copy it
-              globalTimeFilter != null ? globalTimeFilter.copy() : null,
-              timePartitions);
-
-      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
-      // running merge
-      if (sharedQueryDataSource != null) {
-        closedFilePaths = new HashSet<>();
-        unClosedFilePaths = new HashSet<>();
-        addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
-        ((QueryDataSource) 
sharedQueryDataSource).setSingleDevice(singleDeviceId != null);
+    long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+    long startAcquireLockTime = System.nanoTime();
+    if (dataRegion.tryReadLock(waitForLockTime)) {
+      try {
+        // minus already consumed time
+        waitForLockTime -= (System.nanoTime() - startAcquireLockTime) / 
1_000_000;
+
+        // no remaining time slice
+        if (waitForLockTime <= 0) {
+          return false;
+        }
+
+        this.sharedQueryDataSource =
+            dataRegion.query(
+                sourcePaths,
+                // when all the selected series are under the same device, the 
QueryDataSource will
+                // be
+                // filtered according to timeIndex
+                singleDeviceId,
+                this,
+                // time filter may be stateful, so we need to copy it
+                globalTimeFilter != null ? globalTimeFilter.copy() : null,
+                timePartitions,
+                waitForLockTime);
+
+        // used files should be added before mergeLock is unlocked, or they 
may be deleted by
+        // running merge
+        if (sharedQueryDataSource != null) {
+          closedFilePaths = new HashSet<>();
+          unClosedFilePaths = new HashSet<>();
+          addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
+          ((QueryDataSource) 
sharedQueryDataSource).setSingleDevice(singleDeviceId != null);
+          return true;
+        } else {
+          // failed to acquire lock within the specific time

Review Comment:
   [nitpick] The logic in this else block handles the case where query data 
source is null, but this condition should be explicitly checked rather than 
relying on an else clause for clarity.
   ```suggestion
           }
           // failed to acquire lock within the specific time
           if (sharedQueryDataSource == null) {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -2070,72 +2065,212 @@ public void forceCloseAllWorkingTsFileProcessors() 
throws TsFileProcessorExcepti
     }
   }
 
-  /** used for query engine */
   @Override
+  @TestOnly
   public QueryDataSource query(
       List<IFullPath> pathList,
       IDeviceID singleDeviceId,
       QueryContext context,
       Filter globalTimeFilter,
       List<Long> timePartitions)
       throws QueryProcessException {
-    try {
-      List<TsFileResource> seqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(true, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
-              globalTimeFilter,
-              true);
-      List<TsFileResource> unseqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
-              globalTimeFilter,
-              false);
+    return query(
+        pathList, singleDeviceId, context, globalTimeFilter, timePartitions, 
Long.MAX_VALUE);
+  }
+
+  /** used for query engine */
+  @Override
+  public QueryDataSource query(
+      List<IFullPath> pathList,
+      IDeviceID singleDeviceId,
+      QueryContext context,
+      Filter globalTimeFilter,
+      List<Long> timePartitions,
+      long waitForLockTimeInMs)
+      throws QueryProcessException {
+
+    Pair<List<TsFileResource>, List<TsFileResource>> pair =
+        tsFileManager.getAllTsFileListForQuery(timePartitions, 
globalTimeFilter);
+
+    List<TsFileResource> seqTsFileResouceList = pair.left;
+    List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+    List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+    boolean success =
+        tryGetFLushLock(
+            waitForLockTimeInMs,
+            singleDeviceId,
+            globalTimeFilter,
+            context.isDebug(),
+            seqTsFileResouceList,
+            unSeqTsFileResouceList,
+            needToUnLockList);
+
+    if (success) {
+      try {
+        List<TsFileResource> satisfiedSeqResourceList =
+            getFileResourceListForQuery(
+                seqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, true);
+
+        List<TsFileResource> satisfiedUnSeqResourceList =
+            getFileResourceListForQuery(
+                unSeqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, false);
+
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            SEQUENCE_TSFILE, satisfiedSeqResourceList.size());
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            UNSEQUENCE_TSFILE, satisfiedUnSeqResourceList.size());
+        return new QueryDataSource(
+            satisfiedSeqResourceList, satisfiedUnSeqResourceList, 
databaseName);
+      } catch (MetadataException e) {
+        throw new QueryProcessException(e);
+      } finally {
+        clearAlreadyLockedList(needToUnLockList);
+      }
+    } else {
+      // means that failed to acquire lock within the specific time
+      return null;
+    }
+  }
+
+  /**
+   * try to get flush lock for each unclosed satisfied tsfile
+   *
+   * @return true if lock successfully, otherwise false if return false, 
needToUnLockList will
+   *     always be empty because this method is responsible for unlocking all 
the already-acquiring
+   *     lock if return true, the caller is responsible for unlocking all the 
already-acquiring lock
+   *     in needToUnLockList
+   */
+  private boolean tryGetFLushLock(
+      long waitTimeInMs,
+      IDeviceID singleDeviceId,
+      Filter globalTimeFilter,
+      boolean isDebug,
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unSeqResources,
+      List<TsFileProcessor> needToUnLockList) {
+    // deal with seq resources
+    for (TsFileResource tsFileResource : seqResources) {
+      // only need to acquire flush lock for those unclosed and satisfied 
tsfile
+      if (!tsFileResource.isClosed()
+          && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, 
true, isDebug)) {
+        TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+        try {
+          long startTime = System.nanoTime();
+          if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+            // minus already consumed time
+            waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
 
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, 
seqResources.size());
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, 
unseqResources.size());
+            needToUnLockList.add(tsFileProcessor);
 
-      return new QueryDataSource(seqResources, unseqResources, databaseName);
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+            // no remaining time slice
+            if (waitTimeInMs <= 0) {
+              clearAlreadyLockedList(needToUnLockList);
+              return false;
+            }
+          } else {
+            clearAlreadyLockedList(needToUnLockList);
+            return false;
+          }
+        } catch (InterruptedException e) {
+          for (TsFileProcessor processor : needToUnLockList) {
+            processor.readUnLock();
+          }
+          needToUnLockList.clear();
+          Thread.currentThread().interrupt();

Review Comment:
   Good practice to restore interrupt status, but this should be done 
consistently in all catch blocks handling InterruptedException (see line 2177 
which is missing this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to