keith-turner commented on code in PR #5263:
URL: https://github.com/apache/accumulo/pull/5263#discussion_r1917453007


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -179,9 +187,25 @@ public void run() {
   }
 
   private class TransactionRunner implements Runnable {
+    private final AtomicReference<RunnerState> runnerState =
+        new AtomicReference<>(new RunnerState(false, false));
+
+    private class RunnerState {
+      // used to signal a TransactionRunner to stop in the case where there 
are too many running
+      // i.e., the property for the pool size decreased and we have excess 
TransactionRunners
+      private final boolean stop;
+      // whether the TransactionRunner is executing the run() method (may or 
may not be working on
+      // a tx, could be waiting for a tx)
+      private final boolean isRunning;

Review Comment:
   Wonder about changing this track exited which would be set in finally block 
to true.  Running can be false before it started and after its exited, just 
tracking its exited would distinguish between these cases.  Then if a thread 
was not stopped but it exited because of an exception we would know its 
finished.
   
   ```suggestion
         private final boolean exited;
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -425,11 +474,36 @@ public Fate(T environment, FateStore<T> store, boolean 
runDeadResCleaner,
           }
         }
         idleCountHistory.clear();
+      } else if (needed < 0) {
+        // If we need the pool to shrink, then ensure excess 
TransactionRunners are safely stopped.

Review Comment:
   Maybe instead of trying to stop and then unstop thread and sleep, could 
instead just try to get stop set on a goal number of threads.  Do not wait for 
them to exit. Maybe something like the following.
   
   
   ```java
           int numStopped = 
runningTxRunners.stream().filter(tr->tr.isStopped()).count();
           int numToStop = -1*(numStopped + needed);
           for(var tr : runningTxRunners) {
             if(numToStop <= 0) {
               break;
             }
             
             // returns true if it was not already stopped
             if(tr.setStop()) {
               numToStop--;
             }
           }
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -397,24 +443,27 @@ public Fate(T environment, FateStore<T> store, boolean 
runDeadResCleaner,
       Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
     this.store = FateLogger.wrap(store, toLogStrFunc, false);
     this.environment = environment;
-    final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
+    this.transactionExecutor = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
         Property.MANAGER_FATE_THREADPOOL_SIZE, true);
     this.workQueue = new LinkedTransferQueue<>();
+    this.runningTxRunners = new ArrayList<>();
     this.fatePoolWatcher =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
     
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
 -> {
       // resize the pool if the property changed
-      ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-      // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
+      ThreadPools.resizePool(transactionExecutor, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
       final int configured = 
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-      final int needed = configured - pool.getActiveCount();
+      final int needed = configured - transactionExecutor.getActiveCount();

Review Comment:
   MAybe instead of using the active count could use the size of the list and 
do cleanup on it beforehand if the exitting was added.
   
   ```suggestion
         // remove any threads the have exited
         runningTxRunners.removeIf(tr->tr.isExited());
         final int needed = configured - runningTxRunners.size();
   ```
   
   Also wondering about creating a cached thread pool (see 
Executors.newCachedThreadPool() ) and not bothering resizing the pool, just add 
and stop stuff to the pool and it will create threads as needed.  That may be 
too much a change for this PR though, but it would help simplify the code by 
removing the amount things that neede be kept in synch.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to