dlmarion commented on code in PR #5399: URL: https://github.com/apache/accumulo/pull/5399#discussion_r1992334192
########## server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java: ########## @@ -283,42 +286,77 @@ private void removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong } catch (FileNotFoundException ex) { // ignored } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); + log.error("Unable to delete {}", path, ex); } }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS); + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + + final Map<Path,Future<?>> futures = new HashMap<>(collection.size()); final AtomicLong counter = new AtomicLong(); for (Pair<WalState,Path> stateFile : collection) { Path path = stateFile.getSecond(); - removeFile(deleteThreadPool, path, counter, - "Removing " + stateFile.getFirst() + " WAL " + path); + futures.put(path, removeFile(deleteThreadPool, path, counter, + "Removing " + stateFile.getFirst() + " WAL " + path)); } + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e); + } + } + } + } deleteThreadPool.shutdown(); try { while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty } } catch (InterruptedException e1) { log.error("{}", e1.getMessage(), e1); } + status.currentLog.deleted += counter.get(); return counter.get(); } private long removeFiles(Collection<Path> values) { + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS); + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); final AtomicLong counter = new AtomicLong(); for (Path path : values) { - removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path); + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); } + while (!futures.isEmpty()) { Review Comment: I was thinking the faster I process the ones that are complete and remove from the map, the faster I'm giving back memory to the VM. I probably need to put a wait at the bottom of the loop though. If the first one the iterator returns is the last one submitted, then I would be waiting until they are all done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org