kevinrr888 commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1949925574


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -413,111 +324,81 @@ public void run() {
   }
 
   /**
-   * Creates a Fault-tolerant executor.
+   * Creates a Fault-tolerant executor for the given store type.
    *
+   * @param runDeadResCleaner Whether this FATE should run a dead reservation 
cleaner. The real
+   *        FATEs need have a cleaner, but may be undesirable in testing.
    * @param toLogStrFunc A function that converts Repo to Strings that are 
suitable for logging
    */
   public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
       Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
     this.store = FateLogger.wrap(store, toLogStrFunc, false);
-    this.environment = environment;
-    final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
-        Property.MANAGER_FATE_THREADPOOL_SIZE, true);
-    this.workQueue = new LinkedTransferQueue<>();
-    this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
-    this.fatePoolWatcher =
+
+    this.fatePoolsWatcher =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-    
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
 -> {
-      // resize the pool if the property changed
-      ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-      final int configured = 
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-      final int needed = configured - runningTxRunners.size();
-      if (needed > 0) {
-        // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
-        for (int i = 0; i < needed; i++) {
-          try {
-            pool.execute(new TransactionRunner());
-          } catch (RejectedExecutionException e) {
-            // RejectedExecutionException could be shutting down
-            if (pool.isShutdown()) {
-              // The exception is expected in this case, no need to spam the 
logs.
-              log.trace("Error adding transaction runner to FaTE executor 
pool.", e);
-            } else {
-              // This is bad, FaTE may no longer work!
-              log.error("Error adding transaction runner to FaTE executor 
pool.", e);
-            }
-            break;
-          }
-        }
-        idleCountHistory.clear();
-      } else if (needed < 0) {
-        // If we need the pool to shrink, then ensure excess 
TransactionRunners are safely stopped.
-        // Flag the necessary number of TransactionRunners to safely stop when 
they are done work
-        // on a transaction.
-        int numFlagged =
-            (int) 
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
-        int numToStop = -1 * (numFlagged + needed);
-        for (var runner : runningTxRunners) {
-          if (numToStop <= 0) {
-            break;
-          }
-          if (runner.flagStop()) {
-            log.trace("Flagging a TransactionRunner to stop...");
-            numToStop--;
-          }
-        }
-      } else {
-        // The property did not change, but should it based on idle Fate 
threads? Maintain
-        // count of the last X minutes of idle Fate threads. If zero 95% of 
the time, then suggest
-        // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
-        final long interval = Math.min(60, TimeUnit.MILLISECONDS
-            
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
-        if (interval == 0) {
-          idleCountHistory.clear();
-        } else {
-          if (idleCountHistory.size() >= interval * 2) { // this task runs 
every 30s
-            int zeroFateThreadsIdleCount = 0;
-            for (Integer idleConsumerCount : idleCountHistory) {
-              if (idleConsumerCount == 0) {
-                zeroFateThreadsIdleCount++;
-              }
-            }
-            boolean needMoreThreads =
-                (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) 
>= 0.95;
-            if (needMoreThreads) {
-              log.warn(
-                  "All Fate threads appear to be busy for the last {} minutes,"
-                      + " consider increasing property: {}",
-                  interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
-              // Clear the history so that we don't log for interval minutes.
-              idleCountHistory.clear();
-            } else {
-              while (idleCountHistory.size() >= interval * 2) {
-                idleCountHistory.remove();
-              }
-            }
-          }
-          idleCountHistory.add(workQueue.getWaitingConsumerCount());
-        }
-      }
-    }, INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS));
-    this.transactionExecutor = pool;
+    ThreadPools.watchCriticalScheduledTask(
+        fatePoolsWatcher.scheduleWithFixedDelay(new 
FatePoolsWatcher(environment, conf),
+            INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), 
SECONDS));
 
     ScheduledExecutorService deadResCleanerExecutor = null;
     if (runDeadResCleaner) {
       // Create a dead reservation cleaner for this store that will 
periodically clean up
       // reservations held by dead processes, if they exist.
       deadResCleanerExecutor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
-          store.type() + "-dead-reservation-cleaner-pool");
+          store.type() == FateInstanceType.USER ? 
USER_DEAD_RESERVATION_CLEANER_POOL.poolName
+              : META_DEAD_RESERVATION_CLEANER_POOL.poolName);
       ScheduledFuture<?> deadReservationCleaner =
           deadResCleanerExecutor.scheduleWithFixedDelay(new 
DeadReservationCleaner(),
               INITIAL_DELAY.toSeconds(), getDeadResCleanupDelay().toSeconds(), 
SECONDS);
       ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
     }
     this.deadResCleanerExecutor = deadResCleanerExecutor;
 
-    this.workFinder = Threads.createThread("Fate work finder", new 
WorkFinder());
-    this.workFinder.start();
+    startFateExecutors(environment, conf, fateExecutors);
+  }
+
+  protected void startFateExecutors(T environment, AccumuloConfiguration conf,
+      Set<FateExecutor<T>> fateExecutors) {
+    for (var poolConf : getPoolConfigurations(conf).entrySet()) {
+      // no fate threads are running at this point; fine not to synchronize
+      fateExecutors
+          .add(new FateExecutor<>(this, environment, poolConf.getKey(), 
poolConf.getValue()));
+    }
+  }
+
+  /**
+   * Returns a map of the current pool configurations as set in the given 
config. Each key is a set
+   * of fate operations and each value is an integer for the number of threads 
assigned to work
+   * those fate operations.
+   */
+  protected Map<Set<FateOperation>,Integer> 
getPoolConfigurations(AccumuloConfiguration conf) {
+    Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
+    final var json = 
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+

Review Comment:
   If PropertyType acts how I expect, it should not reach this code. My 
interpretation was that PropertyType is how the property is tested for validity 
before ever being set ensuring an invalid prop can never be set. If this is not 
how property type works, it may reach this code. (I'm not sure what other 
purpose PropertyType would serve if not this, though).
   
   Added logs to PropertyType to explain the reason for failure-to-set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to