dlmarion commented on code in PR #5399: URL: https://github.com/apache/accumulo/pull/5399#discussion_r1993355777
########## server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java: ########## @@ -283,42 +286,77 @@ private void removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong } catch (FileNotFoundException ex) { // ignored } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); + log.error("Unable to delete {}", path, ex); } }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS); + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + + final Map<Path,Future<?>> futures = new HashMap<>(collection.size()); final AtomicLong counter = new AtomicLong(); for (Pair<WalState,Path> stateFile : collection) { Path path = stateFile.getSecond(); - removeFile(deleteThreadPool, path, counter, - "Removing " + stateFile.getFirst() + " WAL " + path); + futures.put(path, removeFile(deleteThreadPool, path, counter, + "Removing " + stateFile.getFirst() + " WAL " + path)); } + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e); + } + } + } + } deleteThreadPool.shutdown(); try { while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty } } catch (InterruptedException e1) { log.error("{}", e1.getMessage(), e1); } + status.currentLog.deleted += counter.get(); return counter.get(); } private long removeFiles(Collection<Path> values) { + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS); + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); final AtomicLong counter = new AtomicLong(); for (Path path : values) { - removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path); + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); } + while (!futures.isEmpty()) { Review Comment: Added a sleep at the bottom of the while loop in 93f5c6d. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org