Copilot commented on code in PR #15611:
URL: https://github.com/apache/iotdb/pull/15611#discussion_r2113460289
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java:
##########
@@ -463,7 +463,7 @@ private void verifySchema(ISchemaTree schemaTree)
}
// check datatype
- if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
+ if (tsFileSchema.getType().equals(iotdbSchema.getType())) {
Review Comment:
The condition is inverted: it throws when types match rather than when they
differ. It should read `if
(!tsFileSchema.getType().equals(iotdbSchema.getType()))`.
```suggestion
if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java:
##########
@@ -1082,6 +1082,8 @@ public class IoTDBConfig {
private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
+ private int loadTsFileTabletConversionThreadCount = 5;
Review Comment:
The new config property `loadTsFileTabletConversionThreadCount` should
include a Javadoc comment explaining its purpose and default, and the external
configuration documentation should be updated.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java:
##########
@@ -180,43 +239,67 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- private TSStatus executeInsertMultiTabletsWithRetry(
- final List<PipeTransferTabletRawReq> tabletRawReqs, boolean
isConvertOnTypeMismatch) {
- final InsertMultiTabletsStatement batchStatement = new
InsertMultiTabletsStatement();
- batchStatement.setInsertTabletStatementList(
- tabletRawReqs.stream()
- .map(
- req ->
- new LoadConvertedInsertTabletStatement(
- req.constructStatement(), isConvertOnTypeMismatch))
- .collect(Collectors.toList()));
-
- TSStatus result;
- try {
- result =
- batchStatement.accept(
- LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
- statementExecutor.execute(batchStatement));
-
- // Retry max 5 times if the write process is rejected
- for (int i = 0;
- i < 5
- && result.getCode()
- ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
- i++) {
- Thread.sleep(100L * (i + 1));
- result =
- batchStatement.accept(
- LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
- statementExecutor.execute(batchStatement));
- }
- } catch (final Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ private Future<TSStatus> executeInsertMultiTabletsWithRetry(
+ final InsertMultiTabletsStatement batchStatement) {
+ return getExecutorPool()
+ .submit(
+ () -> {
+ TSStatus result;
+ try {
+ result =
+ batchStatement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(batchStatement));
+
+ // Retry max 5 times if the write process is rejected
+ for (int i = 0;
+ i < 5
+ && result.getCode()
+ ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+ i++) {
+ Thread.sleep(100L * (i + 1));
+ result =
+ batchStatement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(batchStatement));
+ }
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ result =
+ batchStatement.accept(
+
LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+ }
+ return result;
+ });
+ }
+
+ public static WrappedThreadPoolExecutor getExecutorPool() {
+ if (executorPool.get() == null) {
+ synchronized (executorPool) {
Review Comment:
[nitpick] Synchronizing on the `AtomicReference` instance is unconventional.
Consider using a dedicated lock object or an atomic compare-and-set for
thread-safe lazy initialization.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java:
##########
@@ -133,25 +178,39 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
if (!tabletRawReqs.isEmpty()) {
try {
- final TSStatus result =
- executeInsertMultiTabletsWithRetry(
- tabletRawReqs,
loadTsFileStatement.isConvertOnTypeMismatch());
+ final InsertMultiTabletsStatement batchStatement = new
InsertMultiTabletsStatement();
+ batchStatement.setInsertTabletStatementList(
+ tabletRawReqs.stream()
+ .map(
+ req ->
+ new LoadConvertedInsertTabletStatement(
+ req.constructStatement(),
+ loadTsFileStatement.isConvertOnTypeMismatch()))
+ .collect(Collectors.toList()));
+
executionFutures.add(executeInsertMultiTabletsWithRetry(batchStatement));
for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
}
tabletRawReqs.clear();
tabletRawReqSizes.clear();
-
- if (!handleTSStatus(result, loadTsFileStatement)) {
- return Optional.empty();
- }
} catch (final Exception e) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}.",
loadTsFileStatement, e);
return Optional.empty();
}
}
+
+ for (final Future<TSStatus> future : executionFutures) {
+ try {
+ if (!handleTSStatus(future.get(), loadTsFileStatement)) {
+ return Optional.empty();
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ LOGGER.warn("Exception occurs when executing insertion during tablet
conversion: ", e);
+ return Optional.empty();
+ }
Review Comment:
[nitpick] The logic for processing futures is duplicated in
`LoadTableStatementDataTypeConvertExecutionVisitor`. Consider extracting a
shared helper to reduce code duplication.
```suggestion
if (!processFutures(executionFutures, loadTsFileStatement)) {
return Optional.empty();
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java:
##########
@@ -99,27 +133,38 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight());
final long curMemory =
calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1;
- if (block.hasEnoughMemory(curMemory)) {
+ if (block.hasEnoughMemory(
+ curMemory
+ + TABLET_BATCH_MEMORY_SIZE_IN_BYTES
+ * Math.max(
+ 0,
+ (IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getLoadTsFileTabletConversionThreadCount()
+ - 1)))) {
Review Comment:
[nitpick] The memory check multiplies the batch size by the thread count
each iteration, which could lead to over-reservation and blocking. Consider
calculating a per-task threshold instead.
```suggestion
final long perTaskThreshold = TABLET_BATCH_MEMORY_SIZE_IN_BYTES
/ IoTDBDescriptor.getInstance()
.getConfig()
.getLoadTsFileTabletConversionThreadCount();
if (block.hasEnoughMemory(curMemory + perTaskThreshold)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]