keith-turner commented on a change in pull request #2278:
URL: https://github.com/apache/accumulo/pull/2278#discussion_r713158377
##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -226,11 +224,26 @@ public Fate(T environment, TStore<T> store,
Function<Repo<T>,String> toLogStrFun
* Launches the specified number of worker threads.
*/
public void startTransactionRunners(AccumuloConfiguration conf) {
- int numThreads = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
- executor = ThreadPools.createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- for (int i = 0; i < numThreads; i++) {
- executor.execute(new TransactionRunner());
- }
+ final ThreadPoolExecutor pool = (ThreadPoolExecutor)
ThreadPools.createExecutorService(conf,
+ Property.MANAGER_FATE_THREADPOOL_SIZE);
+ fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
+ fatePoolWatcher.schedule(() -> {
+ ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
+ // Assume a thread could execute up to 100 operations a second.
+ // We are sleeping for three seconds. So to calculate max to queue do 3*
100 * numThreads.
+ int maxToQueue = 300 *
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
+ int remaining = maxToQueue - pool.getQueue().size();
+ for (int i = 0; i < remaining; i++) {
+ try {
+ pool.execute(new TransactionRunner());
+ } catch (RejectedExecutionException e) {
+ // RejectedExecutionException could be shutting down
+ log.warn("Error adding transaction runner to FaTE executor pool.");
Review comment:
I think if its shutdown we can avoid spamming the logs. If its not
shutdown, then its really bad and we should include the exception. For the
case where its not shutdown, should we log an error or rethrow the exception?
```suggestion
if(pool.isShutdown()){
// The exception is expected in this case, no need to spam the
logs.
log.trace("Error adding transaction runner to FaTE executor
pool.",e);
} else {
// This is bad, FaTE may no longer work!
log.error("Error adding transaction runner to FaTE executor
pool.",e);
}
```
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
##########
@@ -323,16 +323,11 @@ public static ThreadPoolExecutor
createSelfResizingThreadPool(final String serve
// however, this isn't really an issue, since it adjusts periodically
anyway
if (pool.getCorePoolSize() <= pool.getActiveCount()) {
int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(),
2);
- log.info("Increasing server thread pool size on {} to {}", serverName,
larger);
- pool.setMaximumPoolSize(larger);
- pool.setCorePoolSize(larger);
+ ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
} else {
if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
- if (smaller != pool.getCorePoolSize()) {
- log.info("Decreasing server thread pool size on {} to {}",
serverName, smaller);
- pool.setCorePoolSize(smaller);
- }
+ ThreadPools.resizePool(pool, () -> smaller, serverName +
"-ClientPool");
Review comment:
I have not looked at the context to see if this matters, I noticed this
went from only calling `pool.setCorePoolSize()` to calling
`pool.setMaximumPoolSize(...)` and `pool.setCorePoolSize()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]