keith-turner commented on a change in pull request #2278:
URL: https://github.com/apache/accumulo/pull/2278#discussion_r713158377



##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -226,11 +224,26 @@ public Fate(T environment, TStore<T> store, 
Function<Repo<T>,String> toLogStrFun
    * Launches the specified number of worker threads.
    */
   public void startTransactionRunners(AccumuloConfiguration conf) {
-    int numThreads = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-    executor = ThreadPools.createExecutorService(conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-    for (int i = 0; i < numThreads; i++) {
-      executor.execute(new TransactionRunner());
-    }
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) 
ThreadPools.createExecutorService(conf,
+        Property.MANAGER_FATE_THREADPOOL_SIZE);
+    fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
+    fatePoolWatcher.schedule(() -> {
+      ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
+      // Assume a thread could execute up to 100 operations a second.
+      // We are sleeping for three seconds. So to calculate max to queue do 3* 
100 * numThreads.
+      int maxToQueue = 300 * 
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
+      int remaining = maxToQueue - pool.getQueue().size();
+      for (int i = 0; i < remaining; i++) {
+        try {
+          pool.execute(new TransactionRunner());
+        } catch (RejectedExecutionException e) {
+          // RejectedExecutionException could be shutting down
+          log.warn("Error adding transaction runner to FaTE executor pool.");

Review comment:
       I think if its shutdown we can avoid spamming the logs.  If its not 
shutdown, then its really bad and we should include the exception.  For the 
case where its not shutdown, should we log an error or rethrow the exception?
   
   ```suggestion
         if(pool.isShutdown()){
             // The exception is expected in this case, no need to spam the 
logs.
             log.trace("Error adding transaction runner to FaTE executor 
pool.",e);
          } else {
           // This is bad, FaTE may no longer work!
            log.error("Error adding transaction runner to FaTE executor 
pool.",e);
          }
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
##########
@@ -323,16 +323,11 @@ public static ThreadPoolExecutor 
createSelfResizingThreadPool(final String serve
       // however, this isn't really an issue, since it adjusts periodically 
anyway
       if (pool.getCorePoolSize() <= pool.getActiveCount()) {
         int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 
2);
-        log.info("Increasing server thread pool size on {} to {}", serverName, 
larger);
-        pool.setMaximumPoolSize(larger);
-        pool.setCorePoolSize(larger);
+        ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
       } else {
         if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
           int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
-          if (smaller != pool.getCorePoolSize()) {
-            log.info("Decreasing server thread pool size on {} to {}", 
serverName, smaller);
-            pool.setCorePoolSize(smaller);
-          }
+          ThreadPools.resizePool(pool, () -> smaller, serverName + 
"-ClientPool");

Review comment:
       I have not looked at the context to see if this matters, I noticed this 
went from only calling `pool.setCorePoolSize()` to calling 
`pool.setMaximumPoolSize(...)` and `pool.setCorePoolSize()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to