keith-turner commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1956570688
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -144,260 +141,175 @@ public TFateOperation toThrift() {
}
return top;
}
+
+ public static Set<FateOperation> getAllUserFateOps() {
+ return allUserFateOps;
+ }
+
+ public static Set<FateOperation> getAllMetaFateOps() {
+ return allMetaFateOps;
+ }
}
- /**
- * A single thread that finds transactions to work on and queues them up. Do
not want each worker
- * thread going to the store and looking for work as it would place more
load on the store.
- */
- private class WorkFinder implements Runnable {
+ // The fate pools watcher:
+ // - Maintains a TransactionRunner per available thread per
pool/FateExecutor. Does so by
+ // periodically checking the pools for an inactive thread (i.e., a thread
running a
+ // TransactionRunner died or the pool size was increased in the property),
resizing the pool and
+ // submitting new runners as needed. Also safely stops the necessary number
of TransactionRunners
+ // if the pool size in the property was decreased.
+ // - Warns the user to consider increasing the pool size (or splitting the
fate ops assigned to
+ // that pool into separate pools) for any pool that does not often have any
idle threads.
+ private class FatePoolsWatcher implements Runnable {
+ private final T environment;
+ private final AccumuloConfiguration conf;
+
+ private FatePoolsWatcher(T environment, AccumuloConfiguration conf) {
+ this.environment = environment;
+ this.conf = conf;
+ }
@Override
public void run() {
- while (keepRunning.get()) {
- try {
- store.runnable(keepRunning, fateId -> {
- while (keepRunning.get()) {
- try {
- // The reason for calling transfer instead of queueing is
avoid rescanning the
- // storage layer and adding the same thing over and over. For
example if all threads
- // were busy, the queue size was 100, and there are three
runnable things in the
- // store. Do not want to keep scanning the store adding those
same 3 runnable things
- // until the queue is full.
- if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) {
- break;
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
+ // Read from the config here and here only. Must avoid reading the same
property from the
+ // config more than once since it can change at any point in this
execution
+ var poolConfigs = getPoolConfigurations(conf);
+ var idleCheckIntervalMillis =
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
+
+ // shutdown task: shutdown fate executors whose set of fate operations
are no longer present
+ // in the config
+ synchronized (fateExecutors) {
+ final var fateExecutorsIter = fateExecutors.iterator();
+ while (fateExecutorsIter.hasNext()) {
+ var fateExecutor = fateExecutorsIter.next();
+
+ // if this fate executors set of fate ops is no longer present in
the config...
+ if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
+ if (!fateExecutor.isShutdown()) {
+ log.debug("The config for {} has changed invalidating {}.
Gracefully shutting down "
+ + "the FateExecutor.", getFateConfigProp(), fateExecutor);
+ fateExecutor.initiateShutdown();
+ } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
+ log.debug("{} has been shutdown, but is still actively working
on transactions.",
+ fateExecutor);
+ } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) {
+ log.debug("{} has been shutdown and all threads have safely
terminated.",
+ fateExecutor);
+ fateExecutorsIter.remove();
}
- });
- } catch (Exception e) {
- if (keepRunning.get()) {
- log.warn("Failure while attempting to find work for fate", e);
- } else {
- log.debug("Failure while attempting to find work for fate", e);
}
-
- workQueue.clear();
}
}
- }
- }
-
- private class TransactionRunner implements Runnable {
- // used to signal a TransactionRunner to stop in the case where there are
too many running
- // i.e., the property for the pool size decreased and we have excess
TransactionRunners
- private final AtomicBoolean stop = new AtomicBoolean(false);
- private Optional<FateTxStore<T>> reserveFateTx() throws
InterruptedException {
- while (keepRunning.get() && !stop.get()) {
- FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
-
- if (unreservedFateId == null) {
- continue;
- }
- var optionalopStore = store.tryReserve(unreservedFateId);
- if (optionalopStore.isPresent()) {
- return optionalopStore;
+ // replacement task: at this point, the existing FateExecutors that were
invalidated by the
+ // config changes have started shutdown or finished shutdown. Now create
any new replacement
+ // FateExecutors needed
+ for (var poolConfig : poolConfigs.entrySet()) {
+ var configFateOps = poolConfig.getKey();
+ var configPoolSize = poolConfig.getValue();
+ synchronized (fateExecutors) {
+ if (fateExecutors.stream().map(FateExecutor::getFateOps)
+ .noneMatch(fo -> fo.equals(configFateOps))) {
+ fateExecutors
+ .add(new FateExecutor<>(Fate.this, environment, configFateOps,
configPoolSize));
+ }
}
}
- return Optional.empty();
- }
-
- @Override
- public void run() {
- runningTxRunners.add(this);
- try {
- while (keepRunning.get() && !stop.get()) {
- FateTxStore<T> txStore = null;
- ExecutionState state = new ExecutionState();
- try {
- var optionalopStore = reserveFateTx();
- if (optionalopStore.isPresent()) {
- txStore = optionalopStore.orElseThrow();
- } else {
- continue;
- }
- state.status = txStore.getStatus();
- state.op = txStore.top();
- if (state.status == FAILED_IN_PROGRESS) {
- processFailed(txStore, state.op);
- } else if (state.status == SUBMITTED || state.status ==
IN_PROGRESS) {
+ // resize task: For each fate executor, resize the pool to match the
config as necessary and
+ // submit new TransactionRunners if the pool grew, stop
TransactionRunners if the pool
+ // shrunk, and potentially suggest resizing the pool if the load is
consistently high.
+ synchronized (fateExecutors) {
+ for (var fateExecutor : fateExecutors) {
+ if (fateExecutor.isShutdown()) {
+ continue;
+ }
+ final var pool = fateExecutor.getTransactionExecutor();
+ final var poolName = fateExecutor.getPoolName();
+ final var runningTxRunners = fateExecutor.getRunningTxRunners();
+ final int configured = poolConfigs.get(fateExecutor.getFateOps());
+ ThreadPools.resizePool(pool, () -> configured, poolName);
+ final int needed = configured - runningTxRunners.size();
Review Comment:
Could all of this code for resizing be pushed into FateExectuor? Seems like
a lot of things are requested from fateExecutor for the code here, but not sure
if this code exclusively working w/ refs from fateExecutor. Wondering if here
we could call `fateExecutor.updateSizeConfig(configured)` and then in the
method inside FateExectuor it takes care of starting/stopping threads if
needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]