kevinrr888 commented on code in PR #5817: URL: https://github.com/apache/accumulo/pull/5817#discussion_r2293704935
########## core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java: ########## @@ -412,7 +418,9 @@ public void run() { } } finally { log.trace("A TransactionRunner is exiting..."); - Preconditions.checkState(runningTxRunners.remove(this)); + synchronized (runningTxRunners) { + Preconditions.checkState(runningTxRunners.remove(this)); + } Review Comment: no need to sync here since `this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());` ########## core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java: ########## @@ -105,15 +105,22 @@ public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer> poolConfigs, long idleCheckIntervalMillis) { final var pool = transactionExecutor; - final var runningTxRunners = getRunningTxRunners(); + final var runningTxRunnersCopy = getRunningTxRunners(); final int configured = poolConfigs.get(fateOps); ThreadPools.resizePool(pool, () -> configured, poolName); - final int needed = configured - runningTxRunners.size(); + final int needed = configured - runningTxRunnersCopy.size(); if (needed > 0) { // If the pool grew, then ensure that there is a TransactionRunner for each thread for (int i = 0; i < needed; i++) { try { - pool.execute(new TransactionRunner()); + final TransactionRunner tr = new TransactionRunner(); + synchronized (runningTxRunners) { + if (pool.isShutdown() || pool.isTerminating()) { + return; + } + runningTxRunners.add(tr); + pool.execute(tr); + } Review Comment: I like this switch to add the runner before calling execute, this is safer. Some comments: 1) Should we maybe just avoid the use of the copy and just lock the runningTxRunners for the entire if-elseif-else block? Using the copy and the real seems a bit sketchy, but I'm not sure if it's actually wrong/a problem. The copy was sketching me out when I was working on #5813 so I removed it there as well. 2) `pool.isShutdown() || pool.isTerminating()` --> `pool.isShutdown()` might be clearer. I'm pretty sure `pool.isTerminating()` is true only if `pool.isShutdown()` is true ########## core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java: ########## @@ -105,15 +105,22 @@ public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer> poolConfigs, long idleCheckIntervalMillis) { final var pool = transactionExecutor; - final var runningTxRunners = getRunningTxRunners(); + final var runningTxRunnersCopy = getRunningTxRunners(); final int configured = poolConfigs.get(fateOps); ThreadPools.resizePool(pool, () -> configured, poolName); - final int needed = configured - runningTxRunners.size(); + final int needed = configured - runningTxRunnersCopy.size(); if (needed > 0) { // If the pool grew, then ensure that there is a TransactionRunner for each thread for (int i = 0; i < needed; i++) { try { - pool.execute(new TransactionRunner()); + final TransactionRunner tr = new TransactionRunner(); + synchronized (runningTxRunners) { + if (pool.isShutdown() || pool.isTerminating()) { + return; + } + runningTxRunners.add(tr); + pool.execute(tr); + } } catch (RejectedExecutionException e) { // RejectedExecutionException could be shutting down if (pool.isShutdown()) { Review Comment: I was thinking this case should probably be removed, but it might still be possible if the pool is shutdown after the `if (pool.isShutdown() || pool.isTerminating())` check but before `pool.execute(tr);`... So I think we still need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org