keith-turner commented on code in PR #5263:
URL: https://github.com/apache/accumulo/pull/5263#discussion_r1917453007
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -179,9 +187,25 @@ public void run() {
}
private class TransactionRunner implements Runnable {
+ private final AtomicReference<RunnerState> runnerState =
+ new AtomicReference<>(new RunnerState(false, false));
+
+ private class RunnerState {
+ // used to signal a TransactionRunner to stop in the case where there
are too many running
+ // i.e., the property for the pool size decreased and we have excess
TransactionRunners
+ private final boolean stop;
+ // whether the TransactionRunner is executing the run() method (may or
may not be working on
+ // a tx, could be waiting for a tx)
+ private final boolean isRunning;
Review Comment:
Wonder about changing this track exited which would be set in finally block
to true. Running can be false before it started and after its exited, just
tracking its exited would distinguish between these cases. Then if a thread
was not stopped but it exited because of an exception we would know its
finished.
```suggestion
private final boolean exited;
```
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -425,11 +474,36 @@ public Fate(T environment, FateStore<T> store, boolean
runDeadResCleaner,
}
}
idleCountHistory.clear();
+ } else if (needed < 0) {
+ // If we need the pool to shrink, then ensure excess
TransactionRunners are safely stopped.
Review Comment:
Maybe instead of trying to stop and then unstop thread and sleep, could
instead just try to get stop set on a goal number of threads. Do not wait for
them to exit. Maybe something like the following.
```java
int numStopped =
runningTxRunners.stream().filter(tr->tr.isStopped()).count();
int numToStop = -1*(numStopped + needed);
for(var tr : runningTxRunners) {
if(numToStop <= 0) {
break;
}
// returns true if it was not already stopped
if(tr.setStop()) {
numToStop--;
}
}
```
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -397,24 +443,27 @@ public Fate(T environment, FateStore<T> store, boolean
runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
this.environment = environment;
- final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
+ this.transactionExecutor =
ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
this.workQueue = new LinkedTransferQueue<>();
+ this.runningTxRunners = new ArrayList<>();
this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
-> {
// resize the pool if the property changed
- ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
+ ThreadPools.resizePool(transactionExecutor, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
final int configured =
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int needed = configured - pool.getActiveCount();
+ final int needed = configured - transactionExecutor.getActiveCount();
Review Comment:
MAybe instead of using the active count could use the size of the list and
do cleanup on it beforehand if the exitting was added.
```suggestion
// remove any threads the have exited
runningTxRunners.removeIf(tr->tr.isExited());
final int needed = configured - runningTxRunners.size();
```
Also wondering about creating a cached thread pool (see
Executors.newCachedThreadPool() ) and not bothering resizing the pool, just add
and stop stuff to the pool and it will create threads as needed. That may be
too much a change for this PR though, but it would help simplify the code by
removing the amount things that neede be kept in synch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]