dlmarion commented on code in PR #5399:
URL: https://github.com/apache/accumulo/pull/5399#discussion_r1993355777


##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -283,42 +286,77 @@ private void removeFile(ExecutorService deleteThreadPool, 
Path path, AtomicLong
       } catch (FileNotFoundException ex) {
         // ignored
       } catch (IOException ex) {
-        log.error("Unable to delete wal {}", path, ex);
+        log.error("Unable to delete {}", path, ex);
       }
     });
   }
 
   private long removeFiles(Collection<Pair<WalState,Path>> collection, final 
GCStatus status) {
 
     final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
-        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_THREADS);
+        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+
+    final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
     final AtomicLong counter = new AtomicLong();
 
     for (Pair<WalState,Path> stateFile : collection) {
       Path path = stateFile.getSecond();
-      removeFile(deleteThreadPool, path, counter,
-          "Removing " + stateFile.getFirst() + " WAL " + path);
+      futures.put(path, removeFile(deleteThreadPool, path, counter,
+          "Removing " + stateFile.getFirst() + " WAL " + path));
     }
 
+    while (!futures.isEmpty()) {
+      Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
+      while (iter.hasNext()) {
+        Entry<Path,Future<?>> f = iter.next();
+        if (f.getValue().isDone()) {
+          try {
+            iter.remove();
+            f.getValue().get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Uncaught exception deleting wal file" 
+ f.getKey(), e);
+          }
+        }
+      }
+    }
     deleteThreadPool.shutdown();
     try {
       while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) 
{ // empty
       }
     } catch (InterruptedException e1) {
       log.error("{}", e1.getMessage(), e1);
     }
+    status.currentLog.deleted += counter.get();
     return counter.get();
   }
 
   private long removeFiles(Collection<Path> values) {
+
     final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
-        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_THREADS);
+        .createExecutorService(context.getConfiguration(), 
Property.GC_DELETE_WAL_THREADS);
+    final Map<Path,Future<?>> futures = new HashMap<>(values.size());
     final AtomicLong counter = new AtomicLong();
 
     for (Path path : values) {
-      removeFile(deleteThreadPool, path, counter, "Removing recovery log " + 
path);
+      futures.put(path,
+          removeFile(deleteThreadPool, path, counter, "Removing recovery log " 
+ path));
     }
 
+    while (!futures.isEmpty()) {

Review Comment:
   Added a sleep at the bottom of the while loop in 93f5c6d.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to