Caideyipi commented on code in PR #16854:
URL: https://github.com/apache/iotdb/pull/16854#discussion_r2588202777
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}
+
+ /**
+ * Clean up all listening directories for active load on DataNode first
startup. This method will
+ * clean up all files and subdirectories in the listening directories,
including: 1. Pending
+ * directories (configured by load_active_listening_dirs) 2. Pipe directory
(for pipe data sync)
+ * 3. Failed directory (for failed files)
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final Set<String> dirsToClean = new HashSet<>();
+
+ try {
+ // Add configured listening dirs
+ if
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+ dirsToClean.addAll(
+ Arrays.asList(
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+ }
+
+ // Add pipe dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+ // Add failed dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ } catch (Exception e) {
+ return;
+ }
+
+ int totalFilesDeleted = 0;
+ int totalSubDirsDeleted = 0;
+
+ for (final String dirPath : dirsToClean) {
+ try {
+ final File dir = new File(dirPath);
+
+ if (!dir.exists() || !dir.isDirectory()) {
+ continue;
+ }
+
+ final long[] fileCount = {0};
+ final long[] subdirCount = {0};
+
+ Files.walkFileTree(
+ dir.toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) {
+ try {
+ Files.delete(file);
+ fileCount[0]++;
+ } catch (Exception e) {
+ // Ignore
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir,
IOException exc) {
+ if (exc == null &&
!dir.toFile().getAbsolutePath().equals(dirPath)) {
Review Comment:
DirPath may not be an absolute path...
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}
+
+ /**
+ * Clean up all listening directories for active load on DataNode first
startup. This method will
+ * clean up all files and subdirectories in the listening directories,
including: 1. Pending
+ * directories (configured by load_active_listening_dirs) 2. Pipe directory
(for pipe data sync)
+ * 3. Failed directory (for failed files)
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final Set<String> dirsToClean = new HashSet<>();
+
+ try {
+ // Add configured listening dirs
+ if
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+ dirsToClean.addAll(
+ Arrays.asList(
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+ }
+
+ // Add pipe dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+ // Add failed dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ } catch (Exception e) {
+ return;
+ }
+
+ int totalFilesDeleted = 0;
+ int totalSubDirsDeleted = 0;
+
+ for (final String dirPath : dirsToClean) {
+ try {
+ final File dir = new File(dirPath);
+
+ if (!dir.exists() || !dir.isDirectory()) {
+ continue;
+ }
+
+ final long[] fileCount = {0};
+ final long[] subdirCount = {0};
+
+ Files.walkFileTree(
+ dir.toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) {
+ try {
+ Files.delete(file);
+ fileCount[0]++;
+ } catch (Exception e) {
+ // Ignore
Review Comment:
Shall we log something for the failures?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}
+
+ /**
+ * Clean up all listening directories for active load on DataNode first
startup. This method will
+ * clean up all files and subdirectories in the listening directories,
including: 1. Pending
+ * directories (configured by load_active_listening_dirs) 2. Pipe directory
(for pipe data sync)
+ * 3. Failed directory (for failed files)
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final Set<String> dirsToClean = new HashSet<>();
+
+ try {
+ // Add configured listening dirs
+ if
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+ dirsToClean.addAll(
+ Arrays.asList(
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+ }
+
+ // Add pipe dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+ // Add failed dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ } catch (Exception e) {
+ return;
+ }
+
+ int totalFilesDeleted = 0;
+ int totalSubDirsDeleted = 0;
+
+ for (final String dirPath : dirsToClean) {
+ try {
+ final File dir = new File(dirPath);
+
+ if (!dir.exists() || !dir.isDirectory()) {
+ continue;
+ }
+
+ final long[] fileCount = {0};
+ final long[] subdirCount = {0};
+
+ Files.walkFileTree(
+ dir.toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) {
+ try {
+ Files.delete(file);
+ fileCount[0]++;
+ } catch (Exception e) {
+ // Ignore
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir,
IOException exc) {
+ if (exc == null &&
!dir.toFile().getAbsolutePath().equals(dirPath)) {
+ try {
+ Files.delete(dir);
+ subdirCount[0]++;
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException
exc) {
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ totalFilesDeleted += fileCount[0];
+ totalSubDirsDeleted += subdirCount[0];
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+
+ if (totalFilesDeleted > 0 || totalSubDirsDeleted > 0) {
+ LOGGER.info(
+ "Cleaned up active load listening directories, deleted {} files
and {} subdirectories",
+ totalFilesDeleted,
+ totalSubDirsDeleted);
+ }
+ } catch (Throwable t) {
+ // Ignore all errors to prevent any unexpected behavior
Review Comment:
May log something
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}
+
+ /**
+ * Clean up all listening directories for active load on DataNode first
startup. This method will
+ * clean up all files and subdirectories in the listening directories,
including: 1. Pending
+ * directories (configured by load_active_listening_dirs) 2. Pipe directory
(for pipe data sync)
+ * 3. Failed directory (for failed files)
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final Set<String> dirsToClean = new HashSet<>();
+
+ try {
+ // Add configured listening dirs
+ if
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+ dirsToClean.addAll(
+ Arrays.asList(
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+ }
+
+ // Add pipe dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+ // Add failed dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ } catch (Exception e) {
+ return;
+ }
+
+ int totalFilesDeleted = 0;
+ int totalSubDirsDeleted = 0;
+
+ for (final String dirPath : dirsToClean) {
+ try {
+ final File dir = new File(dirPath);
+
+ if (!dir.exists() || !dir.isDirectory()) {
+ continue;
+ }
+
+ final long[] fileCount = {0};
+ final long[] subdirCount = {0};
+
+ Files.walkFileTree(
+ dir.toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) {
+ try {
+ Files.delete(file);
+ fileCount[0]++;
+ } catch (Exception e) {
+ // Ignore
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir,
IOException exc) {
+ if (exc == null &&
!dir.toFile().getAbsolutePath().equals(dirPath)) {
+ try {
+ Files.delete(dir);
+ subdirCount[0]++;
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException
exc) {
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ totalFilesDeleted += fileCount[0];
+ totalSubDirsDeleted += subdirCount[0];
+ } catch (Exception e) {
+ // Ignore
Review Comment:
May log something
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java:
##########
@@ -48,4 +66,96 @@ public synchronized void start() {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}
+
+ /**
+ * Clean up all listening directories for active load on DataNode first
startup. This method will
+ * clean up all files and subdirectories in the listening directories,
including: 1. Pending
+ * directories (configured by load_active_listening_dirs) 2. Pipe directory
(for pipe data sync)
+ * 3. Failed directory (for failed files)
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final Set<String> dirsToClean = new HashSet<>();
+
+ try {
+ // Add configured listening dirs
+ if
(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
+ dirsToClean.addAll(
+ Arrays.asList(
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+ }
+
+ // Add pipe dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+ // Add failed dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ } catch (Exception e) {
+ return;
Review Comment:
May warn something
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]