jt2594838 commented on code in PR #16853:
URL: https://github.com/apache/iotdb/pull/16853#discussion_r2588557035


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -365,16 +366,29 @@ private TSExecuteStatementResp executeStatementInternal(
 
           queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.statementId);
 
-          result =
-              COORDINATOR.executeForTreeModel(
-                  s,
-                  queryId,
-                  SESSION_MANAGER.getSessionInfo(clientSession),
-                  statement,
-                  partitionFetcher,
-                  schemaFetcher,
-                  req.getTimeout(),
-                  true);
+          // For synchronous multi-file loading, split into sub-statements for 
batch execution
+          if (shouldSplitLoadTsFileStatement(s, false)) {
+            result =
+                executeBatchLoadTsFile(
+                    (LoadTsFileStatement) s,
+                    queryId,
+                    SESSION_MANAGER.getSessionInfo(clientSession),
+                    statement,
+                    partitionFetcher,
+                    schemaFetcher,
+                    config.getQueryTimeoutThreshold());
+          } else {
+            result =
+                COORDINATOR.executeForTreeModel(
+                    s,
+                    queryId,
+                    SESSION_MANAGER.getSessionInfo(clientSession),
+                    statement,
+                    partitionFetcher,
+                    schemaFetcher,
+                    req.getTimeout(),
+                    true);
+          }

Review Comment:
   I do not think it is a good design to add very specialized process logic for 
a certain statement in a top-level function like `executeStatementInternal`. I 
think we'd better add an abstract, like:
   
   shouldSplitLoadTsFileStatement -> shouldSplitStatement
   executeBatchLoadTsFile -> executeBatchStatement
    (LoadTsFileStatement) -> (SplitableStatement)



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -3190,4 +3246,202 @@ public void handleClientExit() {
     PipeDataNodeAgent.receiver().legacy().handleClientExit();
     SubscriptionAgent.receiver().handleClientExit();
   }
+
+  /**
+   * Determines whether a tree-model LoadTsFileStatement should be split into 
multiple
+   * sub-statements for execution.
+   *
+   * @param statement the Statement to be executed
+   * @param requireAsync whether async loading is required
+   * @return true if the statement should be split for execution, false 
otherwise
+   */
+  private boolean shouldSplitLoadTsFileStatement(Statement statement, boolean 
requireAsync) {
+    if (!(statement instanceof LoadTsFileStatement)) {
+      return false;
+    }
+    LoadTsFileStatement loadStmt = (LoadTsFileStatement) statement;
+    return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == 
requireAsync;
+  }
+
+  /**
+   * Determines whether a table-model LoadTsFile should be split into multiple 
sub-statements for
+   * execution.
+   *
+   * @param statement the Statement to be executed
+   * @param requireAsync whether async loading is required
+   * @return true if the statement should be split for execution, false 
otherwise
+   */
+  private boolean shouldSplitTableLoadTsFile(
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement,
+      boolean requireAsync) {
+    if (!(statement
+        instanceof 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile)) {
+      return false;
+    }
+    org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile 
loadStmt =
+        (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) 
statement;
+    return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == 
requireAsync;
+  }
+
+  /**
+   * Executes tree-model LoadTsFileStatement sub-statements in batch.
+   *
+   * @param loadTsFileStatement the LoadTsFileStatement to be executed
+   * @param queryId the query ID
+   * @param sessionInfo the session information
+   * @param statement the SQL statement string
+   * @param partitionFetcher the partition fetcher
+   * @param schemaFetcher the schema fetcher
+   * @param timeoutMs the timeout in milliseconds
+   * @return the execution result
+   */
+  private ExecutionResult executeBatchLoadTsFile(
+      LoadTsFileStatement loadTsFileStatement,
+      long queryId,
+      SessionInfo sessionInfo,
+      String statement,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      long timeoutMs) {
+
+    ExecutionResult result = null;
+    List<LoadTsFileStatement> subStatements = 
loadTsFileStatement.getSubStatement();
+    int totalFiles = subStatements.size();
+
+    LOGGER.info("Start batch loading {} TsFile(s) in tree model, queryId: {}", 
totalFiles, queryId);
+
+    for (int i = 0; i < totalFiles; i++) {
+      LoadTsFileStatement subStatement = subStatements.get(i);
+      LOGGER.info(
+          "Loading TsFile {}/{} in tree model, file: {}, queryId: {}",
+          i + 1,
+          totalFiles,
+          subStatement.getTsFiles().get(0).getName(),
+          queryId);
+
+      result =
+          COORDINATOR.executeForTreeModel(
+              subStatement,
+              queryId,
+              sessionInfo,
+              statement,
+              partitionFetcher,
+              schemaFetcher,
+              timeoutMs,
+              false);
+
+      // Exit early if any sub-statement execution fails
+      if (result != null
+          && result.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.warn(
+            "Failed to load TsFile {}/{} in tree model, file: {}, queryId: {}, 
error: {}",
+            i + 1,
+            totalFiles,
+            subStatement.getTsFiles().get(0).getName(),
+            queryId,
+            result.status.getMessage());
+        break;
+      }

Review Comment:
   Include how many files have been loaded in the result?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java:
##########
@@ -232,6 +232,43 @@ public boolean 
reconstructStatementIfMiniFileConverted(final List<Boolean> isMin
     return tsFiles == null || tsFiles.isEmpty();
   }
 
+  /**
+   * Splits the current LoadTsFile statement into multiple sub-statements, 
each handling one TsFile.
+   * Used to support batch execution when loading multiple files.
+   *
+   * @return the list of sub-statements
+   */

Review Comment:
   Is it possible that each sub-statement handles n (maybe configurable) 
TsFiles?
   Will n affect performance? May need an experiment.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1845,16 +1873,30 @@ public TSStatus 
executeBatchStatement(TSExecuteBatchStatementReq req) {
               queryId = SESSION_MANAGER.requestQueryId();
               type = s.getType() == null ? null : s.getType().name();
               // create and cache dataset
-              result =
-                  COORDINATOR.executeForTreeModel(
-                      s,
-                      queryId,
-                      SESSION_MANAGER.getSessionInfo(clientSession),
-                      statement,
-                      partitionFetcher,
-                      schemaFetcher,
-                      config.getQueryTimeoutThreshold(),
-                      false);
+
+              // For asynchronous multi-file loading, split into 
sub-statements for batch execution
+              if (shouldSplitLoadTsFileStatement(s, true)) {

Review Comment:
   Why does executeBatchStatement equal asynchronous load?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java:
##########
@@ -232,6 +232,43 @@ public boolean 
reconstructStatementIfMiniFileConverted(final List<Boolean> isMin
     return tsFiles == null || tsFiles.isEmpty();
   }
 
+  /**
+   * Splits the current LoadTsFile statement into multiple sub-statements, 
each handling one TsFile.
+   * Used to support batch execution when loading multiple files.

Review Comment:
   Used to support batch execution when loading multiple files. ->Used to limit 
resource consumption during statement analysis, etc.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -3190,4 +3246,202 @@ public void handleClientExit() {
     PipeDataNodeAgent.receiver().legacy().handleClientExit();
     SubscriptionAgent.receiver().handleClientExit();
   }
+
+  /**
+   * Determines whether a tree-model LoadTsFileStatement should be split into 
multiple
+   * sub-statements for execution.
+   *
+   * @param statement the Statement to be executed
+   * @param requireAsync whether async loading is required
+   * @return true if the statement should be split for execution, false 
otherwise
+   */
+  private boolean shouldSplitLoadTsFileStatement(Statement statement, boolean 
requireAsync) {
+    if (!(statement instanceof LoadTsFileStatement)) {
+      return false;
+    }
+    LoadTsFileStatement loadStmt = (LoadTsFileStatement) statement;
+    return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == 
requireAsync;
+  }
+
+  /**
+   * Determines whether a table-model LoadTsFile should be split into multiple 
sub-statements for
+   * execution.
+   *
+   * @param statement the Statement to be executed
+   * @param requireAsync whether async loading is required
+   * @return true if the statement should be split for execution, false 
otherwise
+   */
+  private boolean shouldSplitTableLoadTsFile(
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement,
+      boolean requireAsync) {
+    if (!(statement
+        instanceof 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile)) {
+      return false;
+    }
+    org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile 
loadStmt =
+        (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) 
statement;
+    return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == 
requireAsync;
+  }
+
+  /**
+   * Executes tree-model LoadTsFileStatement sub-statements in batch.
+   *
+   * @param loadTsFileStatement the LoadTsFileStatement to be executed
+   * @param queryId the query ID
+   * @param sessionInfo the session information
+   * @param statement the SQL statement string
+   * @param partitionFetcher the partition fetcher
+   * @param schemaFetcher the schema fetcher
+   * @param timeoutMs the timeout in milliseconds
+   * @return the execution result
+   */
+  private ExecutionResult executeBatchLoadTsFile(
+      LoadTsFileStatement loadTsFileStatement,
+      long queryId,
+      SessionInfo sessionInfo,
+      String statement,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      long timeoutMs) {
+
+    ExecutionResult result = null;
+    List<LoadTsFileStatement> subStatements = 
loadTsFileStatement.getSubStatement();
+    int totalFiles = subStatements.size();

Review Comment:
   totalFiles -> totalFileNum
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to