dlmarion commented on code in PR #5399: URL: https://github.com/apache/accumulo/pull/5399#discussion_r1993355074
########## server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java: ########## @@ -266,37 +274,97 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap, return result; } - private long removeFile(Path path) { - try { - if (!useTrash || !fs.moveToTrash(path)) { - fs.deleteRecursively(path); + private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter, + String msg) { + return deleteThreadPool.submit(() -> { + try { + log.debug(msg); + if (!useTrash || !fs.moveToTrash(path)) { + fs.deleteRecursively(path); + } + counter.incrementAndGet(); + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete {}", path, ex); } - return 1; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); - } - - return 0; + }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + + final Map<Path,Future<?>> futures = new HashMap<>(collection.size()); + final AtomicLong counter = new AtomicLong(); + for (Pair<WalState,Path> stateFile : collection) { Path path = stateFile.getSecond(); - log.debug("Removing {} WAL {}", stateFile.getFirst(), path); - status.currentLog.deleted += removeFile(path); + futures.put(path, removeFile(deleteThreadPool, path, counter, + "Removing " + stateFile.getFirst() + " WAL " + path)); + } + + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e); + } + } + } } - return status.currentLog.deleted; + deleteThreadPool.shutdown(); + try { + while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty + } + } catch (InterruptedException e1) { + log.error("{}", e1.getMessage(), e1); + } + status.currentLog.deleted += counter.get(); + return counter.get(); } private long removeFiles(Collection<Path> values) { - long count = 0; + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); + final AtomicLong counter = new AtomicLong(); + for (Path path : values) { - log.debug("Removing recovery log {}", path); - count += removeFile(path); + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); + } + + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting recovery log file" + f.getKey(), + e); + } + } + } + } + deleteThreadPool.shutdown(); Review Comment: Fixed this in 93f5c6d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org