keith-turner commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1956570688


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -144,260 +141,175 @@ public TFateOperation toThrift() {
       }
       return top;
     }
+
+    public static Set<FateOperation> getAllUserFateOps() {
+      return allUserFateOps;
+    }
+
+    public static Set<FateOperation> getAllMetaFateOps() {
+      return allMetaFateOps;
+    }
   }
 
-  /**
-   * A single thread that finds transactions to work on and queues them up. Do 
not want each worker
-   * thread going to the store and looking for work as it would place more 
load on the store.
-   */
-  private class WorkFinder implements Runnable {
+  // The fate pools watcher:
+  // - Maintains a TransactionRunner per available thread per 
pool/FateExecutor. Does so by
+  // periodically checking the pools for an inactive thread (i.e., a thread 
running a
+  // TransactionRunner died or the pool size was increased in the property), 
resizing the pool and
+  // submitting new runners as needed. Also safely stops the necessary number 
of TransactionRunners
+  // if the pool size in the property was decreased.
+  // - Warns the user to consider increasing the pool size (or splitting the 
fate ops assigned to
+  // that pool into separate pools) for any pool that does not often have any 
idle threads.
+  private class FatePoolsWatcher implements Runnable {
+    private final T environment;
+    private final AccumuloConfiguration conf;
+
+    private FatePoolsWatcher(T environment, AccumuloConfiguration conf) {
+      this.environment = environment;
+      this.conf = conf;
+    }
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        try {
-          store.runnable(keepRunning, fateId -> {
-            while (keepRunning.get()) {
-              try {
-                // The reason for calling transfer instead of queueing is 
avoid rescanning the
-                // storage layer and adding the same thing over and over. For 
example if all threads
-                // were busy, the queue size was 100, and there are three 
runnable things in the
-                // store. Do not want to keep scanning the store adding those 
same 3 runnable things
-                // until the queue is full.
-                if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) {
-                  break;
-                }
-              } catch (InterruptedException e) {
-                throw new IllegalStateException(e);
-              }
+      // Read from the config here and here only. Must avoid reading the same 
property from the
+      // config more than once since it can change at any point in this 
execution
+      var poolConfigs = getPoolConfigurations(conf);
+      var idleCheckIntervalMillis = 
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
+
+      // shutdown task: shutdown fate executors whose set of fate operations 
are no longer present
+      // in the config
+      synchronized (fateExecutors) {
+        final var fateExecutorsIter = fateExecutors.iterator();
+        while (fateExecutorsIter.hasNext()) {
+          var fateExecutor = fateExecutorsIter.next();
+
+          // if this fate executors set of fate ops is no longer present in 
the config...
+          if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
+            if (!fateExecutor.isShutdown()) {
+              log.debug("The config for {} has changed invalidating {}. 
Gracefully shutting down "
+                  + "the FateExecutor.", getFateConfigProp(), fateExecutor);
+              fateExecutor.initiateShutdown();
+            } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
+              log.debug("{} has been shutdown, but is still actively working 
on transactions.",
+                  fateExecutor);
+            } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) {
+              log.debug("{} has been shutdown and all threads have safely 
terminated.",
+                  fateExecutor);
+              fateExecutorsIter.remove();
             }
-          });
-        } catch (Exception e) {
-          if (keepRunning.get()) {
-            log.warn("Failure while attempting to find work for fate", e);
-          } else {
-            log.debug("Failure while attempting to find work for fate", e);
           }
-
-          workQueue.clear();
         }
       }
-    }
-  }
-
-  private class TransactionRunner implements Runnable {
-    // used to signal a TransactionRunner to stop in the case where there are 
too many running
-    // i.e., the property for the pool size decreased and we have excess 
TransactionRunners
-    private final AtomicBoolean stop = new AtomicBoolean(false);
 
-    private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
-      while (keepRunning.get() && !stop.get()) {
-        FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
-
-        if (unreservedFateId == null) {
-          continue;
-        }
-        var optionalopStore = store.tryReserve(unreservedFateId);
-        if (optionalopStore.isPresent()) {
-          return optionalopStore;
+      // replacement task: at this point, the existing FateExecutors that were 
invalidated by the
+      // config changes have started shutdown or finished shutdown. Now create 
any new replacement
+      // FateExecutors needed
+      for (var poolConfig : poolConfigs.entrySet()) {
+        var configFateOps = poolConfig.getKey();
+        var configPoolSize = poolConfig.getValue();
+        synchronized (fateExecutors) {
+          if (fateExecutors.stream().map(FateExecutor::getFateOps)
+              .noneMatch(fo -> fo.equals(configFateOps))) {
+            fateExecutors
+                .add(new FateExecutor<>(Fate.this, environment, configFateOps, 
configPoolSize));
+          }
         }
       }
 
-      return Optional.empty();
-    }
-
-    @Override
-    public void run() {
-      runningTxRunners.add(this);
-      try {
-        while (keepRunning.get() && !stop.get()) {
-          FateTxStore<T> txStore = null;
-          ExecutionState state = new ExecutionState();
-          try {
-            var optionalopStore = reserveFateTx();
-            if (optionalopStore.isPresent()) {
-              txStore = optionalopStore.orElseThrow();
-            } else {
-              continue;
-            }
-            state.status = txStore.getStatus();
-            state.op = txStore.top();
-            if (state.status == FAILED_IN_PROGRESS) {
-              processFailed(txStore, state.op);
-            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+      // resize task: For each fate executor, resize the pool to match the 
config as necessary and
+      // submit new TransactionRunners if the pool grew, stop 
TransactionRunners if the pool
+      // shrunk, and potentially suggest resizing the pool if the load is 
consistently high.
+      synchronized (fateExecutors) {
+        for (var fateExecutor : fateExecutors) {
+          if (fateExecutor.isShutdown()) {
+            continue;
+          }
+          final var pool = fateExecutor.getTransactionExecutor();
+          final var poolName = fateExecutor.getPoolName();
+          final var runningTxRunners = fateExecutor.getRunningTxRunners();
+          final int configured = poolConfigs.get(fateExecutor.getFateOps());
+          ThreadPools.resizePool(pool, () -> configured, poolName);
+          final int needed = configured - runningTxRunners.size();

Review Comment:
   Could all of this code for resizing be pushed into FateExectuor?  Seems like 
a lot of things are requested from fateExecutor for the code here, but not sure 
if this code exclusively working w/ refs from fateExecutor.  Wondering if here 
we could call `fateExecutor.updateSizeConfig(configured)` and then in the 
method inside FateExectuor it takes care of starting/stopping threads if 
needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to