Copilot commented on code in PR #15611:
URL: https://github.com/apache/iotdb/pull/15611#discussion_r2113460289


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java:
##########
@@ -463,7 +463,7 @@ private void verifySchema(ISchemaTree schemaTree)
         }
 
         // check datatype
-        if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
+        if (tsFileSchema.getType().equals(iotdbSchema.getType())) {

Review Comment:
   The condition is inverted: it throws when types match rather than when they 
differ. It should read `if 
(!tsFileSchema.getType().equals(iotdbSchema.getType()))`.
   ```suggestion
           if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java:
##########
@@ -1082,6 +1082,8 @@ public class IoTDBConfig {
 
   private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
 
+  private int loadTsFileTabletConversionThreadCount = 5;

Review Comment:
   The new config property `loadTsFileTabletConversionThreadCount` should 
include a Javadoc comment explaining its purpose and default, and the external 
configuration documentation should be updated.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java:
##########
@@ -180,43 +239,67 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
     return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  private TSStatus executeInsertMultiTabletsWithRetry(
-      final List<PipeTransferTabletRawReq> tabletRawReqs, boolean 
isConvertOnTypeMismatch) {
-    final InsertMultiTabletsStatement batchStatement = new 
InsertMultiTabletsStatement();
-    batchStatement.setInsertTabletStatementList(
-        tabletRawReqs.stream()
-            .map(
-                req ->
-                    new LoadConvertedInsertTabletStatement(
-                        req.constructStatement(), isConvertOnTypeMismatch))
-            .collect(Collectors.toList()));
-
-    TSStatus result;
-    try {
-      result =
-          batchStatement.accept(
-              LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
-              statementExecutor.execute(batchStatement));
-
-      // Retry max 5 times if the write process is rejected
-      for (int i = 0;
-          i < 5
-              && result.getCode()
-                  == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
-          i++) {
-        Thread.sleep(100L * (i + 1));
-        result =
-            batchStatement.accept(
-                LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
-                statementExecutor.execute(batchStatement));
-      }
-    } catch (final Exception e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
+  private Future<TSStatus> executeInsertMultiTabletsWithRetry(
+      final InsertMultiTabletsStatement batchStatement) {
+    return getExecutorPool()
+        .submit(
+            () -> {
+              TSStatus result;
+              try {
+                result =
+                    batchStatement.accept(
+                        LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                        statementExecutor.execute(batchStatement));
+
+                // Retry max 5 times if the write process is rejected
+                for (int i = 0;
+                    i < 5
+                        && result.getCode()
+                            == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+                    i++) {
+                  Thread.sleep(100L * (i + 1));
+                  result =
+                      batchStatement.accept(
+                          LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                          statementExecutor.execute(batchStatement));
+                }
+              } catch (final Exception e) {
+                if (e instanceof InterruptedException) {
+                  Thread.currentThread().interrupt();
+                }
+                result =
+                    batchStatement.accept(
+                        
LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+              }
+              return result;
+            });
+  }
+
+  public static WrappedThreadPoolExecutor getExecutorPool() {
+    if (executorPool.get() == null) {
+      synchronized (executorPool) {

Review Comment:
   [nitpick] Synchronizing on the `AtomicReference` instance is unconventional. 
Consider using a dedicated lock object or an atomic compare-and-set for 
thread-safe lazy initialization.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java:
##########
@@ -133,25 +178,39 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
 
       if (!tabletRawReqs.isEmpty()) {
         try {
-          final TSStatus result =
-              executeInsertMultiTabletsWithRetry(
-                  tabletRawReqs, 
loadTsFileStatement.isConvertOnTypeMismatch());
+          final InsertMultiTabletsStatement batchStatement = new 
InsertMultiTabletsStatement();
+          batchStatement.setInsertTabletStatementList(
+              tabletRawReqs.stream()
+                  .map(
+                      req ->
+                          new LoadConvertedInsertTabletStatement(
+                              req.constructStatement(),
+                              loadTsFileStatement.isConvertOnTypeMismatch()))
+                  .collect(Collectors.toList()));
+          
executionFutures.add(executeInsertMultiTabletsWithRetry(batchStatement));
 
           for (final long memoryCost : tabletRawReqSizes) {
             block.reduceMemoryUsage(memoryCost);
           }
           tabletRawReqs.clear();
           tabletRawReqSizes.clear();
-
-          if (!handleTSStatus(result, loadTsFileStatement)) {
-            return Optional.empty();
-          }
         } catch (final Exception e) {
           LOGGER.warn(
               "Failed to convert data type for LoadTsFileStatement: {}.", 
loadTsFileStatement, e);
           return Optional.empty();
         }
       }
+
+      for (final Future<TSStatus> future : executionFutures) {
+        try {
+          if (!handleTSStatus(future.get(), loadTsFileStatement)) {
+            return Optional.empty();
+          }
+        } catch (ExecutionException | InterruptedException e) {
+          LOGGER.warn("Exception occurs when executing insertion during tablet 
conversion: ", e);
+          return Optional.empty();
+        }

Review Comment:
   [nitpick] The logic for processing futures is duplicated in 
`LoadTableStatementDataTypeConvertExecutionVisitor`. Consider extracting a 
shared helper to reduce code duplication.
   ```suggestion
         if (!processFutures(executionFutures, loadTsFileStatement)) {
           return Optional.empty();
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java:
##########
@@ -99,27 +133,38 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
                 PipeTransferTabletRawReq.toTPipeTransferRawReq(
                     tabletWithIsAligned.getLeft(), 
tabletWithIsAligned.getRight());
             final long curMemory = 
calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1;
-            if (block.hasEnoughMemory(curMemory)) {
+            if (block.hasEnoughMemory(
+                curMemory
+                    + TABLET_BATCH_MEMORY_SIZE_IN_BYTES
+                        * Math.max(
+                            0,
+                            (IoTDBDescriptor.getInstance()
+                                    .getConfig()
+                                    .getLoadTsFileTabletConversionThreadCount()
+                                - 1)))) {

Review Comment:
   [nitpick] The memory check multiplies the batch size by the thread count 
each iteration, which could lead to over-reservation and blocking. Consider 
calculating a per-task threshold instead.
   ```suggestion
               final long perTaskThreshold = TABLET_BATCH_MEMORY_SIZE_IN_BYTES 
/ IoTDBDescriptor.getInstance()
                   .getConfig()
                   .getLoadTsFileTabletConversionThreadCount();
               if (block.hasEnoughMemory(curMemory + perTaskThreshold)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to