Copilot commented on code in PR #15715:
URL: https://github.com/apache/iotdb/pull/15715#discussion_r2144357360
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java:
##########
@@ -734,8 +734,8 @@ private TPipeTransferResp handleTransferSlice(final
PipeTransferSliceReq pipeTra
* request. So for each sub-status which needs to redirect, we record the
device path using the
* message field.
*/
- private TSStatus executeStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
- final TSStatus result = executeStatementAndClassifyExceptions(statement);
+ private TSStatus executeBatchStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
+ final TSStatus result = executeStatementAndClassifyExceptions(statement,
5);
Review Comment:
Consider extracting the hard-coded retry count (5) into a named constant to
enhance readability and ease future adjustments.
```suggestion
final TSStatus result = executeStatementAndClassifyExceptions(statement,
DEFAULT_RETRY_COUNT);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java:
##########
@@ -771,15 +771,46 @@ private TSStatus executeStatementAndAddRedirectInfo(final
InsertBaseStatement st
}
private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
+ return executeStatementAndClassifyExceptions(statement, 1);
+ }
+
+ private TSStatus executeStatementAndClassifyExceptions(
+ final Statement statement, final int tryCount) {
long estimatedMemory = 0L;
final double pipeReceiverActualToEstimatedMemoryRatio =
PIPE_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
try {
if (statement instanceof InsertBaseStatement) {
estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
- allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocate((long) (estimatedMemory *
pipeReceiverActualToEstimatedMemoryRatio));
+ for (int i = 0; i < tryCount; ++i) {
+ try {
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocate(
+ (long) (estimatedMemory *
pipeReceiverActualToEstimatedMemoryRatio));
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (i == tryCount - 1) {
+ final String message =
+ String.format(
+ "Temporarily out of memory when executing statement %s,
Requested memory: %s, "
+ + "used memory: %s, free memory: %s, total
non-floating memory: %s",
+ statement,
+ estimatedMemory *
pipeReceiverActualToEstimatedMemoryRatio,
+
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Receiver id = {}: {}", receiverId.get(),
message, e);
+ }
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(message);
+ } else {
+ Thread.sleep(100L * (i + 1));
Review Comment:
Consider handling InterruptedException explicitly in this retry loop to
ensure that thread interruption is properly managed.
```suggestion
try {
Thread.sleep(100L * (i + 1));
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt(); // Restore the
interrupted status
LOGGER.warn("Receiver id = {}: Thread interrupted during
retry sleep.", receiverId.get(), interruptedException);
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage("Thread interrupted during retry sleep.");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]