kevinrr888 commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1949925574
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -413,111 +324,81 @@ public void run() {
}
/**
- * Creates a Fault-tolerant executor.
+ * Creates a Fault-tolerant executor for the given store type.
*
+ * @param runDeadResCleaner Whether this FATE should run a dead reservation
cleaner. The real
+ * FATEs need have a cleaner, but may be undesirable in testing.
* @param toLogStrFunc A function that converts Repo to Strings that are
suitable for logging
*/
public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
- this.environment = environment;
- final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
- Property.MANAGER_FATE_THREADPOOL_SIZE, true);
- this.workQueue = new LinkedTransferQueue<>();
- this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
- this.fatePoolWatcher =
+
+ this.fatePoolsWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
-> {
- // resize the pool if the property changed
- ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int configured =
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int needed = configured - runningTxRunners.size();
- if (needed > 0) {
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
- for (int i = 0; i < needed; i++) {
- try {
- pool.execute(new TransactionRunner());
- } catch (RejectedExecutionException e) {
- // RejectedExecutionException could be shutting down
- if (pool.isShutdown()) {
- // The exception is expected in this case, no need to spam the
logs.
- log.trace("Error adding transaction runner to FaTE executor
pool.", e);
- } else {
- // This is bad, FaTE may no longer work!
- log.error("Error adding transaction runner to FaTE executor
pool.", e);
- }
- break;
- }
- }
- idleCountHistory.clear();
- } else if (needed < 0) {
- // If we need the pool to shrink, then ensure excess
TransactionRunners are safely stopped.
- // Flag the necessary number of TransactionRunners to safely stop when
they are done work
- // on a transaction.
- int numFlagged =
- (int)
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
- int numToStop = -1 * (numFlagged + needed);
- for (var runner : runningTxRunners) {
- if (numToStop <= 0) {
- break;
- }
- if (runner.flagStop()) {
- log.trace("Flagging a TransactionRunner to stop...");
- numToStop--;
- }
- }
- } else {
- // The property did not change, but should it based on idle Fate
threads? Maintain
- // count of the last X minutes of idle Fate threads. If zero 95% of
the time, then suggest
- // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
- final long interval = Math.min(60, TimeUnit.MILLISECONDS
-
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
- if (interval == 0) {
- idleCountHistory.clear();
- } else {
- if (idleCountHistory.size() >= interval * 2) { // this task runs
every 30s
- int zeroFateThreadsIdleCount = 0;
- for (Integer idleConsumerCount : idleCountHistory) {
- if (idleConsumerCount == 0) {
- zeroFateThreadsIdleCount++;
- }
- }
- boolean needMoreThreads =
- (zeroFateThreadsIdleCount / (double) idleCountHistory.size())
>= 0.95;
- if (needMoreThreads) {
- log.warn(
- "All Fate threads appear to be busy for the last {} minutes,"
- + " consider increasing property: {}",
- interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
- // Clear the history so that we don't log for interval minutes.
- idleCountHistory.clear();
- } else {
- while (idleCountHistory.size() >= interval * 2) {
- idleCountHistory.remove();
- }
- }
- }
- idleCountHistory.add(workQueue.getWaitingConsumerCount());
- }
- }
- }, INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS));
- this.transactionExecutor = pool;
+ ThreadPools.watchCriticalScheduledTask(
+ fatePoolsWatcher.scheduleWithFixedDelay(new
FatePoolsWatcher(environment, conf),
+ INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(),
SECONDS));
ScheduledExecutorService deadResCleanerExecutor = null;
if (runDeadResCleaner) {
// Create a dead reservation cleaner for this store that will
periodically clean up
// reservations held by dead processes, if they exist.
deadResCleanerExecutor =
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
- store.type() + "-dead-reservation-cleaner-pool");
+ store.type() == FateInstanceType.USER ?
USER_DEAD_RESERVATION_CLEANER_POOL.poolName
+ : META_DEAD_RESERVATION_CLEANER_POOL.poolName);
ScheduledFuture<?> deadReservationCleaner =
deadResCleanerExecutor.scheduleWithFixedDelay(new
DeadReservationCleaner(),
INITIAL_DELAY.toSeconds(), getDeadResCleanupDelay().toSeconds(),
SECONDS);
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
- this.workFinder = Threads.createThread("Fate work finder", new
WorkFinder());
- this.workFinder.start();
+ startFateExecutors(environment, conf, fateExecutors);
+ }
+
+ protected void startFateExecutors(T environment, AccumuloConfiguration conf,
+ Set<FateExecutor<T>> fateExecutors) {
+ for (var poolConf : getPoolConfigurations(conf).entrySet()) {
+ // no fate threads are running at this point; fine not to synchronize
+ fateExecutors
+ .add(new FateExecutor<>(this, environment, poolConf.getKey(),
poolConf.getValue()));
+ }
+ }
+
+ /**
+ * Returns a map of the current pool configurations as set in the given
config. Each key is a set
+ * of fate operations and each value is an integer for the number of threads
assigned to work
+ * those fate operations.
+ */
+ protected Map<Set<FateOperation>,Integer>
getPoolConfigurations(AccumuloConfiguration conf) {
+ Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
+ final var json =
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+
Review Comment:
If PropertyType acts how I expect, it should not reach this code. My
interpretation was that PropertyType is how the property is tested for validity
before ever being set ensuring an invalid prop can never be set. If this is not
how property type works, it may reach this code. (I'm not sure what other
purpose PropertyType would serve if not this, though).
Added logs to PropertyType to explain the reason for failure-to-set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]