keith-turner commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1945549409


##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
   
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
 "60s",
       PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper 
to update interval.",
       "1.9.3"),
-  MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64", 
PropertyType.COUNT,
-      "The number of threads used to run fault-tolerant executions (FATE)."
-          + " These are primarily table operations like merge.",
-      "1.4.3"),
+  MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"

Review Comment:
   Would be good to move fate left in the property name because if the props 
are sorted then the fate ones will be next to each other.
   
   ```suggestion
     MANAGER_USER_FATE_CONFIG("manager.fate.user.config", "{"
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -413,111 +324,81 @@ public void run() {
   }
 
   /**
-   * Creates a Fault-tolerant executor.
+   * Creates a Fault-tolerant executor for the given store type.
    *
+   * @param runDeadResCleaner Whether this FATE should run a dead reservation 
cleaner. The real
+   *        FATEs need have a cleaner, but may be undesirable in testing.
    * @param toLogStrFunc A function that converts Repo to Strings that are 
suitable for logging
    */
   public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
       Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
     this.store = FateLogger.wrap(store, toLogStrFunc, false);
-    this.environment = environment;
-    final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
-        Property.MANAGER_FATE_THREADPOOL_SIZE, true);
-    this.workQueue = new LinkedTransferQueue<>();
-    this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
-    this.fatePoolWatcher =
+
+    this.fatePoolsWatcher =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-    
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
 -> {
-      // resize the pool if the property changed
-      ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-      final int configured = 
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-      final int needed = configured - runningTxRunners.size();
-      if (needed > 0) {
-        // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
-        for (int i = 0; i < needed; i++) {
-          try {
-            pool.execute(new TransactionRunner());
-          } catch (RejectedExecutionException e) {
-            // RejectedExecutionException could be shutting down
-            if (pool.isShutdown()) {
-              // The exception is expected in this case, no need to spam the 
logs.
-              log.trace("Error adding transaction runner to FaTE executor 
pool.", e);
-            } else {
-              // This is bad, FaTE may no longer work!
-              log.error("Error adding transaction runner to FaTE executor 
pool.", e);
-            }
-            break;
-          }
-        }
-        idleCountHistory.clear();
-      } else if (needed < 0) {
-        // If we need the pool to shrink, then ensure excess 
TransactionRunners are safely stopped.
-        // Flag the necessary number of TransactionRunners to safely stop when 
they are done work
-        // on a transaction.
-        int numFlagged =
-            (int) 
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
-        int numToStop = -1 * (numFlagged + needed);
-        for (var runner : runningTxRunners) {
-          if (numToStop <= 0) {
-            break;
-          }
-          if (runner.flagStop()) {
-            log.trace("Flagging a TransactionRunner to stop...");
-            numToStop--;
-          }
-        }
-      } else {
-        // The property did not change, but should it based on idle Fate 
threads? Maintain
-        // count of the last X minutes of idle Fate threads. If zero 95% of 
the time, then suggest
-        // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
-        final long interval = Math.min(60, TimeUnit.MILLISECONDS
-            
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
-        if (interval == 0) {
-          idleCountHistory.clear();
-        } else {
-          if (idleCountHistory.size() >= interval * 2) { // this task runs 
every 30s
-            int zeroFateThreadsIdleCount = 0;
-            for (Integer idleConsumerCount : idleCountHistory) {
-              if (idleConsumerCount == 0) {
-                zeroFateThreadsIdleCount++;
-              }
-            }
-            boolean needMoreThreads =
-                (zeroFateThreadsIdleCount / (double) idleCountHistory.size()) 
>= 0.95;
-            if (needMoreThreads) {
-              log.warn(
-                  "All Fate threads appear to be busy for the last {} minutes,"
-                      + " consider increasing property: {}",
-                  interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
-              // Clear the history so that we don't log for interval minutes.
-              idleCountHistory.clear();
-            } else {
-              while (idleCountHistory.size() >= interval * 2) {
-                idleCountHistory.remove();
-              }
-            }
-          }
-          idleCountHistory.add(workQueue.getWaitingConsumerCount());
-        }
-      }
-    }, INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS));
-    this.transactionExecutor = pool;
+    ThreadPools.watchCriticalScheduledTask(
+        fatePoolsWatcher.scheduleWithFixedDelay(new 
FatePoolsWatcher(environment, conf),
+            INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), 
SECONDS));
 
     ScheduledExecutorService deadResCleanerExecutor = null;
     if (runDeadResCleaner) {
       // Create a dead reservation cleaner for this store that will 
periodically clean up
       // reservations held by dead processes, if they exist.
       deadResCleanerExecutor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
-          store.type() + "-dead-reservation-cleaner-pool");
+          store.type() == FateInstanceType.USER ? 
USER_DEAD_RESERVATION_CLEANER_POOL.poolName
+              : META_DEAD_RESERVATION_CLEANER_POOL.poolName);
       ScheduledFuture<?> deadReservationCleaner =
           deadResCleanerExecutor.scheduleWithFixedDelay(new 
DeadReservationCleaner(),
               INITIAL_DELAY.toSeconds(), getDeadResCleanupDelay().toSeconds(), 
SECONDS);
       ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
     }
     this.deadResCleanerExecutor = deadResCleanerExecutor;
 
-    this.workFinder = Threads.createThread("Fate work finder", new 
WorkFinder());
-    this.workFinder.start();
+    startFateExecutors(environment, conf, fateExecutors);
+  }
+
+  protected void startFateExecutors(T environment, AccumuloConfiguration conf,
+      Set<FateExecutor<T>> fateExecutors) {
+    for (var poolConf : getPoolConfigurations(conf).entrySet()) {
+      // no fate threads are running at this point; fine not to synchronize
+      fateExecutors
+          .add(new FateExecutor<>(this, environment, poolConf.getKey(), 
poolConf.getValue()));
+    }
+  }
+
+  /**
+   * Returns a map of the current pool configurations as set in the given 
config. Each key is a set
+   * of fate operations and each value is an integer for the number of threads 
assigned to work
+   * those fate operations.
+   */
+  protected Map<Set<FateOperation>,Integer> 
getPoolConfigurations(AccumuloConfiguration conf) {
+    Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
+    final var json = 
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+

Review Comment:
   The code in PropertyType validates that all FateOperatiopns are present in 
the config that there is no duplication.  Not sure where this should go, but in 
the case where those conditions are not met will it get to this code?
   
   Also it would be nice to log somewhere what the problem, if that is not 
being done.  For example log that  COMMIT_COMPACTION was not seen in the 
config, or log that SYSTEM_SPLIT was seen twice in the config.  Not sure where 
that should be done, was thinking that maybe it could be done here.



##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
   
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
 "60s",
       PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper 
to update interval.",
       "1.9.3"),
-  MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64", 
PropertyType.COUNT,
-      "The number of threads used to run fault-tolerant executions (FATE)."
-          + " These are primarily table operations like merge.",
-      "1.4.3"),
+  MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"
+      + 
"\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER\":
 1,"
+      + "\"TABLE_BULK_IMPORT2\": 2,"
+      + "\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+      + 
"\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT\":
 2"
+      + "}", PropertyType.USER_FATE_CONFIG,
+      "The number of threads used to run user-initiated fault-tolerant "
+          + "executions (FATE). These are primarily table operations like 
merge. Each key/value "
+          + "of the provided JSON corresponds to one thread pool. Each key is 
a list of one or "
+          + "more FATE operations and each value is the number of threads that 
will be assigned "
+          + "to the pool.",
+      "4.0.0"),
+  MANAGER_META_FATE_CONFIG("manager.meta.fate.config",
+      "{\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+          + "\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT\": 2}",
+      PropertyType.META_FATE_CONFIG,
+      "The number of threads used to run system-initiated fault-tolerant "
+          + "executions (FATE). These are primarily table operations like 
merge. Each key/value "
+          + "of the provided JSON corresponds to one thread pool. Each key is 
a list of one or "
+          + "more FATE operations and each value is the number of threads that 
will be assigned "
+          + "to the pool.",
+      "4.0.0"),

Review Comment:
   Not sure its good deprecate this prop in 3.1 because there is no replacement 
in 3.1.  So if set it may generate warnings, but there is no action that could 
be taken to avoid the warning in 3.1.
   
   Could just deprecate it in 4.0 and ignore it, maybe log a warning if its set 
that its no longer used.  Or drop it in 4.0 w/o any deprecation. However its 
done, it would be really nice to log a message in 4.0 if the old prop is set 
that informs the user that this new property needs to be used instead.  
Providing the breadcrumbs that helps someone figure out what they should do 
instead is really helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to