keith-turner commented on code in PR #5399: URL: https://github.com/apache/accumulo/pull/5399#discussion_r1994190621
########## server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java: ########## @@ -266,37 +273,102 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap, return result; } - private long removeFile(Path path) { - try { - if (!useTrash || !fs.moveToTrash(path)) { - fs.deleteRecursively(path); + private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter, + String msg) { + return deleteThreadPool.submit(() -> { + try { + log.debug(msg); + if (!useTrash || !fs.moveToTrash(path)) { + fs.deleteRecursively(path); + } + counter.incrementAndGet(); + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete {}", path, ex); } - return 1; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); - } - - return 0; + }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { - for (Pair<WalState,Path> stateFile : collection) { - Path path = stateFile.getSecond(); - log.debug("Removing {} WAL {}", stateFile.getFirst(), path); - status.currentLog.deleted += removeFile(path); + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(collection.size()); + final AtomicLong counter = new AtomicLong(); + + try { + for (Pair<WalState,Path> stateFile : collection) { + Path path = stateFile.getSecond(); + futures.put(path, removeFile(deleteThreadPool, path, counter, + "Removing " + stateFile.getFirst() + " WAL " + path)); + } + + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e); + } + } + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sleeping", e); + } + } + } finally { + deleteThreadPool.shutdownNow(); } - return status.currentLog.deleted; + status.currentLog.deleted += counter.get(); + return counter.get(); } private long removeFiles(Collection<Path> values) { - long count = 0; + + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); + final AtomicLong counter = new AtomicLong(); + for (Path path : values) { - log.debug("Removing recovery log {}", path); - count += removeFile(path); + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); + } + + try { Review Comment: could move this try up to be immediately after the creation of deleteThreadPool -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org