jt2594838 commented on code in PR #16854:
URL: https://github.com/apache/iotdb/pull/16854#discussion_r2591153599


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,108 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();

Review Comment:
   Why use a set? Is there a chance that dirs may be duplicated?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,108 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        LOGGER.warn("Failed to get active load listening directories 
configuration", e);
+        return;
+      }
+
+      int totalFilesDeleted = 0;
+      int totalSubDirsDeleted = 0;

Review Comment:
   Add `num` to the variable names to indicate that they are numbers, not sets 
of files.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,108 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        LOGGER.warn("Failed to get active load listening directories 
configuration", e);
+        return;
+      }
+
+      int totalFilesDeleted = 0;
+      int totalSubDirsDeleted = 0;
+
+      for (final String dirPath : dirsToClean) {
+        try {
+          final File dir = new File(dirPath);
+
+          if (!dir.exists() || !dir.isDirectory()) {
+            continue;
+          }
+
+          // Convert to absolute path for comparison
+          final String absoluteDirPath = dir.getAbsolutePath();
+
+          final long[] fileCount = {0};
+          final long[] subdirCount = {0};
+
+          Files.walkFileTree(
+              dir.toPath(),
+              new SimpleFileVisitor<Path>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                  try {
+                    Files.delete(file);
+                    fileCount[0]++;
+                  } catch (Exception e) {
+                    LOGGER.debug("Failed to delete file: {}", 
file.toAbsolutePath(), e);
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path subDir, 
IOException exc) {
+                  if (exc != null) {
+                    LOGGER.debug(
+                        "Error occurred while visiting directory: {}",
+                        subDir.toAbsolutePath(),
+                        exc);
+                    return FileVisitResult.CONTINUE;
+                  }
+                  if 
(!subDir.toFile().getAbsolutePath().equals(absoluteDirPath)) {
+                    try {
+                      Files.delete(subDir);
+                      subdirCount[0]++;
+                    } catch (Exception e) {
+                      LOGGER.debug("Failed to delete directory: {}", 
subDir.toAbsolutePath(), e);
+                    }
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult visitFileFailed(Path file, IOException 
exc) {
+                  LOGGER.debug("Failed to visit file: {}", 
file.toAbsolutePath(), exc);
+                  return FileVisitResult.CONTINUE;
+                }
+              });
+
+          totalFilesDeleted += fileCount[0];
+          totalSubDirsDeleted += subdirCount[0];
+        } catch (Exception e) {
+          LOGGER.warn("Failed to cleanup directory: {}", dirPath, e);
+        }
+      }
+
+      if (totalFilesDeleted > 0 || totalSubDirsDeleted > 0) {
+        LOGGER.info(
+            "Cleaned up active load listening directories, deleted {} files 
and {} subdirectories",
+            totalFilesDeleted,
+            totalSubDirsDeleted);
+      }

Review Comment:
   Why bother writing these instead of using 
org.apache.iotdb.commons.utils.FileUtils#deleteFileOrDirectory(java.io.File, 
boolean)?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        return;

Review Comment:
   How could an exception be thrown here? 
   I only see set operations and getters are called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to