Copilot commented on code in PR #15715:
URL: https://github.com/apache/iotdb/pull/15715#discussion_r2144357360


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java:
##########
@@ -734,8 +734,8 @@ private TPipeTransferResp handleTransferSlice(final 
PipeTransferSliceReq pipeTra
    * request. So for each sub-status which needs to redirect, we record the 
device path using the
    * message field.
    */
-  private TSStatus executeStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
-    final TSStatus result = executeStatementAndClassifyExceptions(statement);
+  private TSStatus executeBatchStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
+    final TSStatus result = executeStatementAndClassifyExceptions(statement, 
5);

Review Comment:
   Consider extracting the hard-coded retry count (5) into a named constant to 
enhance readability and ease future adjustments.
   ```suggestion
       final TSStatus result = executeStatementAndClassifyExceptions(statement, 
DEFAULT_RETRY_COUNT);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java:
##########
@@ -771,15 +771,46 @@ private TSStatus executeStatementAndAddRedirectInfo(final 
InsertBaseStatement st
   }
 
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
+    return executeStatementAndClassifyExceptions(statement, 1);
+  }
+
+  private TSStatus executeStatementAndClassifyExceptions(
+      final Statement statement, final int tryCount) {
     long estimatedMemory = 0L;
     final double pipeReceiverActualToEstimatedMemoryRatio =
         PIPE_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
     try {
       if (statement instanceof InsertBaseStatement) {
         estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
-        allocatedMemoryBlock =
-            PipeDataNodeResourceManager.memory()
-                .forceAllocate((long) (estimatedMemory * 
pipeReceiverActualToEstimatedMemoryRatio));
+        for (int i = 0; i < tryCount; ++i) {
+          try {
+            allocatedMemoryBlock =
+                PipeDataNodeResourceManager.memory()
+                    .forceAllocate(
+                        (long) (estimatedMemory * 
pipeReceiverActualToEstimatedMemoryRatio));
+            break;
+          } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+            if (i == tryCount - 1) {
+              final String message =
+                  String.format(
+                      "Temporarily out of memory when executing statement %s, 
Requested memory: %s, "
+                          + "used memory: %s, free memory: %s, total 
non-floating memory: %s",
+                      statement,
+                      estimatedMemory * 
pipeReceiverActualToEstimatedMemoryRatio,
+                      
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+                      
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+                      
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Receiver id = {}: {}", receiverId.get(), 
message, e);
+              }
+              return new TSStatus(
+                      
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                  .setMessage(message);
+            } else {
+              Thread.sleep(100L * (i + 1));

Review Comment:
   Consider handling InterruptedException explicitly in this retry loop to 
ensure that thread interruption is properly managed.
   ```suggestion
                 try {
                   Thread.sleep(100L * (i + 1));
                 } catch (InterruptedException interruptedException) {
                   Thread.currentThread().interrupt(); // Restore the 
interrupted status
                   LOGGER.warn("Receiver id = {}: Thread interrupted during 
retry sleep.", receiverId.get(), interruptedException);
                   return new TSStatus(
                           
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
                       .setMessage("Thread interrupted during retry sleep.");
                 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to