keith-turner commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1945549409
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
"60s",
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
- MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
PropertyType.COUNT,
- "The number of threads used to run fault-tolerant executions (FATE)."
- + " These are primarily table operations like merge.",
- "1.4.3"),
+ MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"
Review Comment:
Would be good to move fate left in the property name because if the props
are sorted then the fate ones will be next to each other.
```suggestion
MANAGER_USER_FATE_CONFIG("manager.fate.user.config", "{"
```
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -413,111 +324,81 @@ public void run() {
}
/**
- * Creates a Fault-tolerant executor.
+ * Creates a Fault-tolerant executor for the given store type.
*
+ * @param runDeadResCleaner Whether this FATE should run a dead reservation
cleaner. The real
+ * FATEs need have a cleaner, but may be undesirable in testing.
* @param toLogStrFunc A function that converts Repo to Strings that are
suitable for logging
*/
public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
- this.environment = environment;
- final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
- Property.MANAGER_FATE_THREADPOOL_SIZE, true);
- this.workQueue = new LinkedTransferQueue<>();
- this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
- this.fatePoolWatcher =
+
+ this.fatePoolsWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
-> {
- // resize the pool if the property changed
- ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int configured =
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int needed = configured - runningTxRunners.size();
- if (needed > 0) {
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
- for (int i = 0; i < needed; i++) {
- try {
- pool.execute(new TransactionRunner());
- } catch (RejectedExecutionException e) {
- // RejectedExecutionException could be shutting down
- if (pool.isShutdown()) {
- // The exception is expected in this case, no need to spam the
logs.
- log.trace("Error adding transaction runner to FaTE executor
pool.", e);
- } else {
- // This is bad, FaTE may no longer work!
- log.error("Error adding transaction runner to FaTE executor
pool.", e);
- }
- break;
- }
- }
- idleCountHistory.clear();
- } else if (needed < 0) {
- // If we need the pool to shrink, then ensure excess
TransactionRunners are safely stopped.
- // Flag the necessary number of TransactionRunners to safely stop when
they are done work
- // on a transaction.
- int numFlagged =
- (int)
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
- int numToStop = -1 * (numFlagged + needed);
- for (var runner : runningTxRunners) {
- if (numToStop <= 0) {
- break;
- }
- if (runner.flagStop()) {
- log.trace("Flagging a TransactionRunner to stop...");
- numToStop--;
- }
- }
- } else {
- // The property did not change, but should it based on idle Fate
threads? Maintain
- // count of the last X minutes of idle Fate threads. If zero 95% of
the time, then suggest
- // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
- final long interval = Math.min(60, TimeUnit.MILLISECONDS
-
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
- if (interval == 0) {
- idleCountHistory.clear();
- } else {
- if (idleCountHistory.size() >= interval * 2) { // this task runs
every 30s
- int zeroFateThreadsIdleCount = 0;
- for (Integer idleConsumerCount : idleCountHistory) {
- if (idleConsumerCount == 0) {
- zeroFateThreadsIdleCount++;
- }
- }
- boolean needMoreThreads =
- (zeroFateThreadsIdleCount / (double) idleCountHistory.size())
>= 0.95;
- if (needMoreThreads) {
- log.warn(
- "All Fate threads appear to be busy for the last {} minutes,"
- + " consider increasing property: {}",
- interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
- // Clear the history so that we don't log for interval minutes.
- idleCountHistory.clear();
- } else {
- while (idleCountHistory.size() >= interval * 2) {
- idleCountHistory.remove();
- }
- }
- }
- idleCountHistory.add(workQueue.getWaitingConsumerCount());
- }
- }
- }, INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS));
- this.transactionExecutor = pool;
+ ThreadPools.watchCriticalScheduledTask(
+ fatePoolsWatcher.scheduleWithFixedDelay(new
FatePoolsWatcher(environment, conf),
+ INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(),
SECONDS));
ScheduledExecutorService deadResCleanerExecutor = null;
if (runDeadResCleaner) {
// Create a dead reservation cleaner for this store that will
periodically clean up
// reservations held by dead processes, if they exist.
deadResCleanerExecutor =
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
- store.type() + "-dead-reservation-cleaner-pool");
+ store.type() == FateInstanceType.USER ?
USER_DEAD_RESERVATION_CLEANER_POOL.poolName
+ : META_DEAD_RESERVATION_CLEANER_POOL.poolName);
ScheduledFuture<?> deadReservationCleaner =
deadResCleanerExecutor.scheduleWithFixedDelay(new
DeadReservationCleaner(),
INITIAL_DELAY.toSeconds(), getDeadResCleanupDelay().toSeconds(),
SECONDS);
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
- this.workFinder = Threads.createThread("Fate work finder", new
WorkFinder());
- this.workFinder.start();
+ startFateExecutors(environment, conf, fateExecutors);
+ }
+
+ protected void startFateExecutors(T environment, AccumuloConfiguration conf,
+ Set<FateExecutor<T>> fateExecutors) {
+ for (var poolConf : getPoolConfigurations(conf).entrySet()) {
+ // no fate threads are running at this point; fine not to synchronize
+ fateExecutors
+ .add(new FateExecutor<>(this, environment, poolConf.getKey(),
poolConf.getValue()));
+ }
+ }
+
+ /**
+ * Returns a map of the current pool configurations as set in the given
config. Each key is a set
+ * of fate operations and each value is an integer for the number of threads
assigned to work
+ * those fate operations.
+ */
+ protected Map<Set<FateOperation>,Integer>
getPoolConfigurations(AccumuloConfiguration conf) {
+ Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
+ final var json =
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+
Review Comment:
The code in PropertyType validates that all FateOperatiopns are present in
the config that there is no duplication. Not sure where this should go, but in
the case where those conditions are not met will it get to this code?
Also it would be nice to log somewhere what the problem, if that is not
being done. For example log that COMMIT_COMPACTION was not seen in the
config, or log that SYSTEM_SPLIT was seen twice in the config. Not sure where
that should be done, was thinking that maybe it could be done here.
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
"60s",
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
- MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
PropertyType.COUNT,
- "The number of threads used to run fault-tolerant executions (FATE)."
- + " These are primarily table operations like merge.",
- "1.4.3"),
+ MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"
+ +
"\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER\":
1,"
+ + "\"TABLE_BULK_IMPORT2\": 2,"
+ + "\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+ +
"\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT\":
2"
+ + "}", PropertyType.USER_FATE_CONFIG,
+ "The number of threads used to run user-initiated fault-tolerant "
+ + "executions (FATE). These are primarily table operations like
merge. Each key/value "
+ + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
+ + "more FATE operations and each value is the number of threads that
will be assigned "
+ + "to the pool.",
+ "4.0.0"),
+ MANAGER_META_FATE_CONFIG("manager.meta.fate.config",
+ "{\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+ + "\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT\": 2}",
+ PropertyType.META_FATE_CONFIG,
+ "The number of threads used to run system-initiated fault-tolerant "
+ + "executions (FATE). These are primarily table operations like
merge. Each key/value "
+ + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
+ + "more FATE operations and each value is the number of threads that
will be assigned "
+ + "to the pool.",
+ "4.0.0"),
Review Comment:
Not sure its good deprecate this prop in 3.1 because there is no replacement
in 3.1. So if set it may generate warnings, but there is no action that could
be taken to avoid the warning in 3.1.
Could just deprecate it in 4.0 and ignore it, maybe log a warning if its set
that its no longer used. Or drop it in 4.0 w/o any deprecation. However its
done, it would be really nice to log a message in 4.0 if the old prop is set
that informs the user that this new property needs to be used instead.
Providing the breadcrumbs that helps someone figure out what they should do
instead is really helpful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]