kevinrr888 commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1941675846


##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -256,21 +256,24 @@ protected Stream<FateIdStatus> 
getTransactions(EnumSet<TStatus> statuses) {
       RowFateStatusFilter.configureScanner(scanner, statuses);
       TxColumnFamily.STATUS_COLUMN.fetch(scanner);
       TxColumnFamily.RESERVATION_COLUMN.fetch(scanner);
+      TxInfoColumnFamily.TX_NAME_COLUMN.fetch(scanner);
       return scanner.stream().onClose(scanner::close).map(e -> {
         String txUUIDStr = e.getKey().getRow().toString();
         FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
         SortedMap<Key,Value> rowMap;
         TStatus status = TStatus.UNKNOWN;
         FateReservation reservation = null;
+        Fate.FateOperation fateOp = null;

Review Comment:
   My impl includes rename of `txName` to `fateOp` in the places where I write 
new code. Ignore the discrepancy in naming: these are the same and will be 
consolidated in a later PR via https://github.com/apache/accumulo/issues/5230.



##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -249,30 +249,32 @@ private void 
testMultipleFateInstances(TestStoreFactory<SleepingTestEnv> testSto
     Fate<SleepingTestEnv> fate2 =
         new Fate<>(testEnv2, store2, false, Object::toString, 
DefaultConfiguration.getInstance());
 
-    for (int i = 0; i < numFateIds; i++) {
-      FateId fateId;
-      // Start half the txns using fate1, and the other half using fate2
-      if (i % 2 == 0) {
-        fateId = fate1.startTransaction();
-        fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), 
true, "test");
-      } else {
-        fateId = fate2.startTransaction();
-        fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), 
true, "test");
+    try {

Review Comment:
   I made shutdown() in a finally block for the tests in this class to avoid 
failures in one test affecting the others. Should have already existed in the 
test.



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -144,260 +140,175 @@ public TFateOperation toThrift() {
       }
       return top;
     }
+
+    public static Set<FateOperation> getAllUserFateOps() {
+      return allUserFateOps;
+    }
+
+    public static Set<FateOperation> getAllMetaFateOps() {
+      return allMetaFateOps;
+    }
   }
 
-  /**
-   * A single thread that finds transactions to work on and queues them up. Do 
not want each worker
-   * thread going to the store and looking for work as it would place more 
load on the store.
-   */
-  private class WorkFinder implements Runnable {
+  // The fate pools watcher:
+  // - Maintains a TransactionRunner per available thread per 
pool/FateExecutor. Does so by
+  // periodically checking the pools for an inactive thread (i.e., a thread 
running a
+  // TransactionRunner died or the pool size was increased in the property), 
resizing the pool and
+  // submitting new runners as needed. Also safely stops the necessary number 
of TransactionRunners
+  // if the pool size in the property was decreased.
+  // - Warns the user to consider increasing the pool size (or splitting the 
fate ops assigned to
+  // that pool into separate pools) for any pool that does not often have any 
idle threads.
+  private class FatePoolsWatcher implements Runnable {
+    private final T environment;
+    private final AccumuloConfiguration conf;
+
+    private FatePoolsWatcher(T environment, AccumuloConfiguration conf) {
+      this.environment = environment;
+      this.conf = conf;
+    }
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        try {
-          store.runnable(keepRunning, fateId -> {
-            while (keepRunning.get()) {
-              try {
-                // The reason for calling transfer instead of queueing is 
avoid rescanning the
-                // storage layer and adding the same thing over and over. For 
example if all threads
-                // were busy, the queue size was 100, and there are three 
runnable things in the
-                // store. Do not want to keep scanning the store adding those 
same 3 runnable things
-                // until the queue is full.
-                if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) {
-                  break;
-                }
-              } catch (InterruptedException e) {
-                throw new IllegalStateException(e);
-              }
+      // Read from the config here and here only. Must avoid reading the same 
property from the
+      // config more than once since it can change at any point in this 
execution
+      var poolConfigs = getPoolConfigurations(conf);
+      var idleCheckIntervalMillis = 
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
+
+      // shutdown task: shutdown fate executors whose set of fate operations 
are no longer present
+      // in the config
+      synchronized (fateExecutors) {
+        final var fateExecutorsIter = fateExecutors.iterator();
+        while (fateExecutorsIter.hasNext()) {
+          var fateExecutor = fateExecutorsIter.next();
+
+          // if this fate executors set of fate ops is no longer present in 
the config...
+          if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
+            if (!fateExecutor.isShutdown()) {
+              log.debug("The config for {} has changed invalidating {}. 
Gracefully shutting down "
+                  + "the FateExecutor.", getFateConfigProp(), fateExecutor);
+              fateExecutor.initiateShutdown();
+            } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
+              log.debug("{} has been shutdown, but is still actively working 
on transactions.",
+                  fateExecutor);
+            } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) {
+              log.debug("{} has been shutdown and all threads have safely 
terminated.",
+                  fateExecutor);
+              fateExecutorsIter.remove();
             }
-          });
-        } catch (Exception e) {
-          if (keepRunning.get()) {
-            log.warn("Failure while attempting to find work for fate", e);
-          } else {
-            log.debug("Failure while attempting to find work for fate", e);
           }
-
-          workQueue.clear();
         }
       }
-    }
-  }
-
-  private class TransactionRunner implements Runnable {
-    // used to signal a TransactionRunner to stop in the case where there are 
too many running
-    // i.e., the property for the pool size decreased and we have excess 
TransactionRunners
-    private final AtomicBoolean stop = new AtomicBoolean(false);
 
-    private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
-      while (keepRunning.get() && !stop.get()) {
-        FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
-
-        if (unreservedFateId == null) {
-          continue;
-        }
-        var optionalopStore = store.tryReserve(unreservedFateId);
-        if (optionalopStore.isPresent()) {
-          return optionalopStore;
+      // replacement task: at this point, the existing FateExecutors that were 
invalidated by the
+      // config changes have started shutdown or finished shutdown. Now create 
any new replacement
+      // FateExecutors needed
+      for (var poolConfig : poolConfigs.entrySet()) {
+        var configFateOps = poolConfig.getKey();
+        var configPoolSize = poolConfig.getValue();
+        synchronized (fateExecutors) {
+          if (fateExecutors.stream().map(FateExecutor::getFateOps)
+              .noneMatch(fo -> fo.equals(configFateOps))) {
+            fateExecutors
+                .add(new FateExecutor<>(Fate.this, environment, configFateOps, 
configPoolSize));
+          }
         }
       }
 
-      return Optional.empty();
-    }
-
-    @Override
-    public void run() {
-      runningTxRunners.add(this);
-      try {
-        while (keepRunning.get() && !stop.get()) {
-          FateTxStore<T> txStore = null;
-          ExecutionState state = new ExecutionState();
-          try {
-            var optionalopStore = reserveFateTx();
-            if (optionalopStore.isPresent()) {
-              txStore = optionalopStore.orElseThrow();
-            } else {
-              continue;
-            }
-            state.status = txStore.getStatus();
-            state.op = txStore.top();
-            if (state.status == FAILED_IN_PROGRESS) {
-              processFailed(txStore, state.op);
-            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+      // resize task: For each fate executor, resize the pool to match the 
config as necessary and
+      // submit new TransactionRunners if the pool grew, stop 
TransactionRunners if the pool
+      // shrunk, and potentially suggest resizing the pool if the load is 
consistently high.
+      synchronized (fateExecutors) {
+        for (var fateExecutor : fateExecutors) {
+          if (fateExecutor.isShutdown()) {
+            continue;
+          }
+          final var pool = fateExecutor.getTransactionExecutor();
+          final var poolName = fateExecutor.getPoolName();
+          final var runningTxRunners = fateExecutor.getRunningTxRunners();
+          final int configured = poolConfigs.get(fateExecutor.getFateOps());
+          ThreadPools.resizePool(pool, () -> configured, poolName);

Review Comment:
   I have not yet explored changing this into a cached thread pool (discussed 
in https://github.com/apache/accumulo/pull/5263#discussion_r1917457670 thread). 
The changes were already pretty large and didn't want to introduce more. 
Writing this as a reminder to do as a potential follow on.



##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
+import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TransferQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.util.ShutdownUtil;
+import org.apache.accumulo.core.util.Timer;
+import org.apache.accumulo.core.util.threads.ThreadPoolNames;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles finding and working on FATE work. Only finds/works on fate 
operations that it is assigned
+ * to work on defined by 'fateOps'
+ */
+public class FateExecutor<T> {
+  private static final Logger log = 
LoggerFactory.getLogger(FateExecutor.class);
+  private final Logger runnerLog = 
LoggerFactory.getLogger(TransactionRunner.class);
+
+  private final T environment;
+  private final Fate<T> fate;
+  private final Thread workFinder;
+  private final TransferQueue<FateId> workQueue;
+  private final String poolName;
+  private final ThreadPoolExecutor transactionExecutor;
+  private final Set<TransactionRunner> runningTxRunners;
+  private final Set<Fate.FateOperation> fateOps;
+  private final ConcurrentLinkedQueue<Integer> idleCountHistory = new 
ConcurrentLinkedQueue<>();
+
+  public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> 
fateOps, int poolSize) {
+    final String operatesOn = fate.getStore().type().name().toLowerCase() + "."
+        + fateOps.stream().map(fo -> 
fo.name().toLowerCase()).collect(Collectors.joining("."));
+    final String transactionRunnerPoolName =
+        ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + operatesOn;
+    final String workFinderThreadName = "fate.work.finder." + operatesOn;

Review Comment:
   Example pool name: 
`accumulo.pool.manager.fate.meta.table_split.system_split.table_delete_range.table_merge`
   This is long, but descriptive



##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
   
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
 "60s",
       PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper 
to update interval.",
       "1.9.3"),
-  MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64", 
PropertyType.COUNT,
-      "The number of threads used to run fault-tolerant executions (FATE)."
-          + " These are primarily table operations like merge.",
-      "1.4.3"),
+  MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"
+      + 
"\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER\":
 1,"
+      + "\"TABLE_BULK_IMPORT2\": 2,"
+      + "\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+      + 
"\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT\":
 2"
+      + "}", PropertyType.USER_FATE_CONFIG,
+      "The number of threads used to run user-initiated fault-tolerant "
+          + "executions (FATE). These are primarily table operations like 
merge. Each key/value "
+          + "of the provided JSON corresponds to one thread pool. Each key is 
a list of one or "
+          + "more FATE operations and each value is the number of threads that 
will be assigned "
+          + "to the pool.",
+      "4.0.0"),
+  MANAGER_META_FATE_CONFIG("manager.meta.fate.config",
+      "{\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+          + "\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT\": 2}",
+      PropertyType.META_FATE_CONFIG,
+      "The number of threads used to run system-initiated fault-tolerant "
+          + "executions (FATE). These are primarily table operations like 
merge. Each key/value "
+          + "of the provided JSON corresponds to one thread pool. Each key is 
a list of one or "
+          + "more FATE operations and each value is the number of threads that 
will be assigned "
+          + "to the pool.",
+      "4.0.0"),

Review Comment:
   Several comments here:
   - This is my best attempt as to what fate ops are valid META fate ops. I may 
have excluded some that should have been included or vice-versa.
   - What is the proper way to deprecate the old MANAGER_FATE_THREADPOOL_SIZE 
property
   - These default pool sizes and default pools probably aren't optimal. Input 
into how the FATE ops should be split and how many threads for each pool would 
be appreciated. This was just my best attempt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to