Caideyipi commented on code in PR #16854:
URL: https://github.com/apache/iotdb/pull/16854#discussion_r2588202777


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        return;
+      }
+
+      int totalFilesDeleted = 0;
+      int totalSubDirsDeleted = 0;
+
+      for (final String dirPath : dirsToClean) {
+        try {
+          final File dir = new File(dirPath);
+
+          if (!dir.exists() || !dir.isDirectory()) {
+            continue;
+          }
+
+          final long[] fileCount = {0};
+          final long[] subdirCount = {0};
+
+          Files.walkFileTree(
+              dir.toPath(),
+              new SimpleFileVisitor<Path>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                  try {
+                    Files.delete(file);
+                    fileCount[0]++;
+                  } catch (Exception e) {
+                    // Ignore
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) {
+                  if (exc == null && 
!dir.toFile().getAbsolutePath().equals(dirPath)) {

Review Comment:
   DirPath may not be an absolute path...



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        return;
+      }
+
+      int totalFilesDeleted = 0;
+      int totalSubDirsDeleted = 0;
+
+      for (final String dirPath : dirsToClean) {
+        try {
+          final File dir = new File(dirPath);
+
+          if (!dir.exists() || !dir.isDirectory()) {
+            continue;
+          }
+
+          final long[] fileCount = {0};
+          final long[] subdirCount = {0};
+
+          Files.walkFileTree(
+              dir.toPath(),
+              new SimpleFileVisitor<Path>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                  try {
+                    Files.delete(file);
+                    fileCount[0]++;
+                  } catch (Exception e) {
+                    // Ignore

Review Comment:
   Shall we log something for the failures?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        return;
+      }
+
+      int totalFilesDeleted = 0;
+      int totalSubDirsDeleted = 0;
+
+      for (final String dirPath : dirsToClean) {
+        try {
+          final File dir = new File(dirPath);
+
+          if (!dir.exists() || !dir.isDirectory()) {
+            continue;
+          }
+
+          final long[] fileCount = {0};
+          final long[] subdirCount = {0};
+
+          Files.walkFileTree(
+              dir.toPath(),
+              new SimpleFileVisitor<Path>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                  try {
+                    Files.delete(file);
+                    fileCount[0]++;
+                  } catch (Exception e) {
+                    // Ignore
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) {
+                  if (exc == null && 
!dir.toFile().getAbsolutePath().equals(dirPath)) {
+                    try {
+                      Files.delete(dir);
+                      subdirCount[0]++;
+                    } catch (Exception e) {
+                      // Ignore
+                    }
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult visitFileFailed(Path file, IOException 
exc) {
+                  return FileVisitResult.CONTINUE;
+                }
+              });
+
+          totalFilesDeleted += fileCount[0];
+          totalSubDirsDeleted += subdirCount[0];
+        } catch (Exception e) {
+          // Ignore
+        }
+      }
+
+      if (totalFilesDeleted > 0 || totalSubDirsDeleted > 0) {
+        LOGGER.info(
+            "Cleaned up active load listening directories, deleted {} files 
and {} subdirectories",
+            totalFilesDeleted,
+            totalSubDirsDeleted);
+      }
+    } catch (Throwable t) {
+      // Ignore all errors to prevent any unexpected behavior

Review Comment:
   May log something



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        return;
+      }
+
+      int totalFilesDeleted = 0;
+      int totalSubDirsDeleted = 0;
+
+      for (final String dirPath : dirsToClean) {
+        try {
+          final File dir = new File(dirPath);
+
+          if (!dir.exists() || !dir.isDirectory()) {
+            continue;
+          }
+
+          final long[] fileCount = {0};
+          final long[] subdirCount = {0};
+
+          Files.walkFileTree(
+              dir.toPath(),
+              new SimpleFileVisitor<Path>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                  try {
+                    Files.delete(file);
+                    fileCount[0]++;
+                  } catch (Exception e) {
+                    // Ignore
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) {
+                  if (exc == null && 
!dir.toFile().getAbsolutePath().equals(dirPath)) {
+                    try {
+                      Files.delete(dir);
+                      subdirCount[0]++;
+                    } catch (Exception e) {
+                      // Ignore
+                    }
+                  }
+                  return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult visitFileFailed(Path file, IOException 
exc) {
+                  return FileVisitResult.CONTINUE;
+                }
+              });
+
+          totalFilesDeleted += fileCount[0];
+          totalSubDirsDeleted += subdirCount[0];
+        } catch (Exception e) {
+          // Ignore

Review Comment:
   May log something



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final Set<String> dirsToClean = new HashSet<>();
+
+      try {
+        // Add configured listening dirs
+        if 
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+          dirsToClean.addAll(
+              Arrays.asList(
+                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+        }
+
+        // Add pipe dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+        // Add failed dir
+        
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+      } catch (Exception e) {
+        return;

Review Comment:
   May warn something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to