dlmarion commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1944891008
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java:
##########
@@ -584,6 +584,16 @@ public TStatus getStatus() {
public Optional<FateReservation> getFateReservation() {
return nodeSupplier.get().reservation;
}
+
+ @Override
+ public Optional<Fate.FateOperation> getFateOperation() {
+ var fateOp = nodeSupplier.get().txInfo.get(TxInfo.TX_NAME);
+ if (fateOp == null) {
+ return Optional.empty();
+ } else {
+ return Optional.of((Fate.FateOperation) fateOp);
+ }
Review Comment:
Might be able to use the following:
```suggestion
var fateOp = (Fate.FateOperation)
nodeSupplier.get().txInfo.get(TxInfo.TX_NAME);
return Optional.ofNullable(fateOp);
```
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -413,111 +324,81 @@ public void run() {
}
/**
- * Creates a Fault-tolerant executor.
+ * Creates a Fault-tolerant executor for the given store type.
*
+ * @param runDeadResCleaner Whether this FATE should run a dead reservation
cleaner. The real
+ * FATEs need have a cleaner, but may be undesirable in testing.
* @param toLogStrFunc A function that converts Repo to Strings that are
suitable for logging
*/
public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
- this.environment = environment;
- final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
- Property.MANAGER_FATE_THREADPOOL_SIZE, true);
- this.workQueue = new LinkedTransferQueue<>();
- this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
- this.fatePoolWatcher =
+
+ this.fatePoolsWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
-> {
- // resize the pool if the property changed
- ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int configured =
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int needed = configured - runningTxRunners.size();
- if (needed > 0) {
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
- for (int i = 0; i < needed; i++) {
- try {
- pool.execute(new TransactionRunner());
- } catch (RejectedExecutionException e) {
- // RejectedExecutionException could be shutting down
- if (pool.isShutdown()) {
- // The exception is expected in this case, no need to spam the
logs.
- log.trace("Error adding transaction runner to FaTE executor
pool.", e);
- } else {
- // This is bad, FaTE may no longer work!
- log.error("Error adding transaction runner to FaTE executor
pool.", e);
- }
- break;
- }
- }
- idleCountHistory.clear();
- } else if (needed < 0) {
- // If we need the pool to shrink, then ensure excess
TransactionRunners are safely stopped.
- // Flag the necessary number of TransactionRunners to safely stop when
they are done work
- // on a transaction.
- int numFlagged =
- (int)
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
- int numToStop = -1 * (numFlagged + needed);
- for (var runner : runningTxRunners) {
- if (numToStop <= 0) {
- break;
- }
- if (runner.flagStop()) {
- log.trace("Flagging a TransactionRunner to stop...");
- numToStop--;
- }
- }
- } else {
- // The property did not change, but should it based on idle Fate
threads? Maintain
- // count of the last X minutes of idle Fate threads. If zero 95% of
the time, then suggest
- // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
- final long interval = Math.min(60, TimeUnit.MILLISECONDS
-
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
- if (interval == 0) {
- idleCountHistory.clear();
- } else {
- if (idleCountHistory.size() >= interval * 2) { // this task runs
every 30s
- int zeroFateThreadsIdleCount = 0;
- for (Integer idleConsumerCount : idleCountHistory) {
- if (idleConsumerCount == 0) {
- zeroFateThreadsIdleCount++;
- }
- }
- boolean needMoreThreads =
- (zeroFateThreadsIdleCount / (double) idleCountHistory.size())
>= 0.95;
- if (needMoreThreads) {
- log.warn(
- "All Fate threads appear to be busy for the last {} minutes,"
- + " consider increasing property: {}",
- interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
- // Clear the history so that we don't log for interval minutes.
- idleCountHistory.clear();
- } else {
- while (idleCountHistory.size() >= interval * 2) {
- idleCountHistory.remove();
- }
- }
- }
- idleCountHistory.add(workQueue.getWaitingConsumerCount());
- }
- }
- }, INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS));
- this.transactionExecutor = pool;
+ ThreadPools.watchCriticalScheduledTask(
+ fatePoolsWatcher.scheduleWithFixedDelay(new
FatePoolsWatcher(environment, conf),
+ INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(),
SECONDS));
ScheduledExecutorService deadResCleanerExecutor = null;
if (runDeadResCleaner) {
// Create a dead reservation cleaner for this store that will
periodically clean up
// reservations held by dead processes, if they exist.
deadResCleanerExecutor =
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
- store.type() + "-dead-reservation-cleaner-pool");
+ store.type() == FateInstanceType.USER ?
USER_DEAD_RESERVATION_CLEANER_POOL.poolName
+ : META_DEAD_RESERVATION_CLEANER_POOL.poolName);
ScheduledFuture<?> deadReservationCleaner =
deadResCleanerExecutor.scheduleWithFixedDelay(new
DeadReservationCleaner(),
INITIAL_DELAY.toSeconds(), getDeadResCleanupDelay().toSeconds(),
SECONDS);
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
- this.workFinder = Threads.createThread("Fate work finder", new
WorkFinder());
- this.workFinder.start();
+ startFateExecutors(environment, conf, fateExecutors);
+ }
+
+ protected void startFateExecutors(T environment, AccumuloConfiguration conf,
+ Set<FateExecutor<T>> fateExecutors) {
+ for (var poolConf : getPoolConfigurations(conf).entrySet()) {
+ // no fate threads are running at this point; fine not to synchronize
+ fateExecutors
+ .add(new FateExecutor<>(this, environment, poolConf.getKey(),
poolConf.getValue()));
+ }
+ }
+
+ /**
+ * Returns a map of the current pool configurations as set in the given
config. Each key is a set
+ * of fate operations and each value is an integer for the number of threads
assigned to work
+ * those fate operations.
+ */
+ protected Map<Set<FateOperation>,Integer>
getPoolConfigurations(AccumuloConfiguration conf) {
+ Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
+ final var json =
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+
+ for (var entry : json.entrySet()) {
+ var key = entry.getKey();
+ var val = entry.getValue().getAsInt();
+ var fateOpsStrArr = key.split(",");
+ Set<FateOperation> fateOpsSet =
+
Arrays.stream(fateOpsStrArr).map(FateOperation::valueOf).collect(Collectors.toSet());
+
+ poolConfigs.put(fateOpsSet, val);
Review Comment:
I'm thinking that fateOpsSet should be sorted. Likewise, I think
fateExecutor.getFateOps() needs to return a sorted set also.
##########
core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java:
##########
@@ -393,4 +411,60 @@ public static IntStream parse(String portRange) {
}
+ private static class ValidFateConfig implements Predicate<String> {
+ private final Set<Fate.FateOperation> allFateOps;
+
+ private ValidFateConfig(Set<Fate.FateOperation> allFateOps) {
+ this.allFateOps = allFateOps;
+ }
+
+ @Override
+ public boolean test(String s) {
+ final Set<Fate.FateOperation> seenFateOps;
+
+ try {
+ final var json = JsonParser.parseString(s).getAsJsonObject();
+ seenFateOps = new HashSet<>();
+
+ for (var entry : json.entrySet()) {
+ var key = entry.getKey();
+ var val = entry.getValue().getAsInt();
+ if (val <= 0) {
+ return false;
+ }
+ var fateOpsStrArr = key.split(",");
+ for (String fateOpStr : fateOpsStrArr) {
+ Fate.FateOperation fateOp = Fate.FateOperation.valueOf(fateOpStr);
+ if (seenFateOps.contains(fateOp)) {
+ return false;
+ }
+ seenFateOps.add(fateOp);
+ }
+ }
+ } catch (Exception e) {
+ return false;
Review Comment:
We probably want to log the exception. Could be invalid json.
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
"60s",
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
- MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
PropertyType.COUNT,
- "The number of threads used to run fault-tolerant executions (FATE)."
- + " These are primarily table operations like merge.",
- "1.4.3"),
+ MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"
+ +
"\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER\":
1,"
+ + "\"TABLE_BULK_IMPORT2\": 2,"
+ + "\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+ +
"\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT\":
2"
+ + "}", PropertyType.USER_FATE_CONFIG,
+ "The number of threads used to run user-initiated fault-tolerant "
+ + "executions (FATE). These are primarily table operations like
merge. Each key/value "
+ + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
+ + "more FATE operations and each value is the number of threads that
will be assigned "
+ + "to the pool.",
+ "4.0.0"),
+ MANAGER_META_FATE_CONFIG("manager.meta.fate.config",
+ "{\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+ + "\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT\": 2}",
+ PropertyType.META_FATE_CONFIG,
+ "The number of threads used to run system-initiated fault-tolerant "
+ + "executions (FATE). These are primarily table operations like
merge. Each key/value "
+ + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
+ + "more FATE operations and each value is the number of threads that
will be assigned "
+ + "to the pool.",
+ "4.0.0"),
Review Comment:
> What is the proper way to deprecate the old MANAGER_FATE_THREADPOOL_SIZE
property
I think you can just mark it deprecated in the 3.1 branch. I don't think you
want to use the ReplacedBy annotation on it, but instead make a comment in the
3.1 code that it's going to be removed in 4.0 and replaced by these new
properties.. In the 4.0 branch, remove the property.
##########
core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java:
##########
@@ -393,4 +411,60 @@ public static IntStream parse(String portRange) {
}
+ private static class ValidFateConfig implements Predicate<String> {
+ private final Set<Fate.FateOperation> allFateOps;
+
+ private ValidFateConfig(Set<Fate.FateOperation> allFateOps) {
+ this.allFateOps = allFateOps;
+ }
+
+ @Override
+ public boolean test(String s) {
+ final Set<Fate.FateOperation> seenFateOps;
+
+ try {
+ final var json = JsonParser.parseString(s).getAsJsonObject();
+ seenFateOps = new HashSet<>();
+
+ for (var entry : json.entrySet()) {
+ var key = entry.getKey();
+ var val = entry.getValue().getAsInt();
+ if (val <= 0) {
+ return false;
+ }
+ var fateOpsStrArr = key.split(",");
+ for (String fateOpStr : fateOpsStrArr) {
+ Fate.FateOperation fateOp = Fate.FateOperation.valueOf(fateOpStr);
+ if (seenFateOps.contains(fateOp)) {
+ return false;
+ }
+ seenFateOps.add(fateOp);
+ }
+ }
+ } catch (Exception e) {
+ return false;
+ }
+
+ return allFateOps.equals(seenFateOps);
+ }
+ }
+
+ private static class ValidUserFateConfig extends ValidFateConfig {
+ private static final Set<Fate.FateOperation> allFateOps =
+ Fate.FateOperation.getAllUserFateOps();
+
+ private ValidUserFateConfig() {
+ super(allFateOps);
+ }
+ }
+
+ private static class ValidMetaFateConfig extends ValidFateConfig {
+ private static final Set<Fate.FateOperation> allFateOps =
+ Fate.FateOperation.getAllMetaFateOps();
+
+ private ValidMetaFateConfig() {
+ super(allFateOps);
Review Comment:
Same comment as above.
##########
core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java:
##########
@@ -393,4 +411,60 @@ public static IntStream parse(String portRange) {
}
+ private static class ValidFateConfig implements Predicate<String> {
+ private final Set<Fate.FateOperation> allFateOps;
+
+ private ValidFateConfig(Set<Fate.FateOperation> allFateOps) {
+ this.allFateOps = allFateOps;
+ }
+
+ @Override
+ public boolean test(String s) {
+ final Set<Fate.FateOperation> seenFateOps;
+
+ try {
+ final var json = JsonParser.parseString(s).getAsJsonObject();
+ seenFateOps = new HashSet<>();
+
+ for (var entry : json.entrySet()) {
+ var key = entry.getKey();
+ var val = entry.getValue().getAsInt();
+ if (val <= 0) {
+ return false;
+ }
+ var fateOpsStrArr = key.split(",");
+ for (String fateOpStr : fateOpsStrArr) {
+ Fate.FateOperation fateOp = Fate.FateOperation.valueOf(fateOpStr);
+ if (seenFateOps.contains(fateOp)) {
+ return false;
+ }
+ seenFateOps.add(fateOp);
+ }
+ }
+ } catch (Exception e) {
+ return false;
+ }
+
+ return allFateOps.equals(seenFateOps);
+ }
+ }
+
+ private static class ValidUserFateConfig extends ValidFateConfig {
+ private static final Set<Fate.FateOperation> allFateOps =
+ Fate.FateOperation.getAllUserFateOps();
+
+ private ValidUserFateConfig() {
+ super(allFateOps);
Review Comment:
I don't think we need to create a static variable here that just references
another static variable.
```suggestion
super(Fate.FateOperation.getAllUserFateOps());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]