keith-turner commented on code in PR #5399:
URL: https://github.com/apache/accumulo/pull/5399#discussion_r1994190621


##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -266,37 +273,102 @@ private long 
removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap,
     return result;
   }
 
-  private long removeFile(Path path) {
-    try {
-      if (!useTrash || !fs.moveToTrash(path)) {
-        fs.deleteRecursively(path);
+  private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, 
AtomicLong counter,
+      String msg) {
+    return deleteThreadPool.submit(() -> {
+      try {
+        log.debug(msg);
+        if (!useTrash || !fs.moveToTrash(path)) {
+          fs.deleteRecursively(path);
+        }
+        counter.incrementAndGet();
+      } catch (FileNotFoundException ex) {
+        // ignored
+      } catch (IOException ex) {
+        log.error("Unable to delete {}", path, ex);
       }
-      return 1;
-    } catch (FileNotFoundException ex) {
-      // ignored
-    } catch (IOException ex) {
-      log.error("Unable to delete wal {}", path, ex);
-    }
-
-    return 0;
+    });
   }
 
   private long removeFiles(Collection<Pair<WalState,Path>> collection, final 
GCStatus status) {
-    for (Pair<WalState,Path> stateFile : collection) {
-      Path path = stateFile.getSecond();
-      log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
-      status.currentLog.deleted += removeFile(path);
+
+    final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+    final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
+    final AtomicLong counter = new AtomicLong();
+
+    try {
+      for (Pair<WalState,Path> stateFile : collection) {
+        Path path = stateFile.getSecond();
+        futures.put(path, removeFile(deleteThreadPool, path, counter,
+            "Removing " + stateFile.getFirst() + " WAL " + path));
+      }
+
+      while (!futures.isEmpty()) {
+        Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+        while (iter.hasNext()) {
+          Entry<Path,Future<?>> f = iter.next();
+          if (f.getValue().isDone()) {
+            try {
+              iter.remove();
+              f.getValue().get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException("Uncaught exception deleting wal 
file" + f.getKey(), e);
+            }
+          }
+        }
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while sleeping", e);
+        }
+      }
+    } finally {
+      deleteThreadPool.shutdownNow();
     }
-    return status.currentLog.deleted;
+    status.currentLog.deleted += counter.get();
+    return counter.get();
   }
 
   private long removeFiles(Collection<Path> values) {
-    long count = 0;
+
+    final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
+        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+    final Map<Path,Future<?>> futures = new HashMap<>(values.size());
+    final AtomicLong counter = new AtomicLong();
+
     for (Path path : values) {
-      log.debug("Removing recovery log {}", path);
-      count += removeFile(path);
+      futures.put(path,
+          removeFile(deleteThreadPool, path, counter, "Removing recovery log " 
+ path));
+    }
+
+    try {

Review Comment:
   could move this try up to be immediately after the creation of 
deleteThreadPool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to