keith-turner commented on a change in pull request #2278:
URL: https://github.com/apache/accumulo/pull/2278#discussion_r712548893



##########
File path: 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -29,33 +29,30 @@
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ThreadPools {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
+
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
-  private static void makeResizeable(final ThreadPoolExecutor pool,
-      final AccumuloConfiguration conf, final Property p) {
-    final String threadName = p.name().concat("_watcher");
-    Threads.createThread(threadName, () -> {
-      int count = conf.getCount(p);
-      while (Thread.currentThread().isAlive() && 
!Thread.currentThread().isInterrupted()) {
-        try {
-          Thread.sleep(1000);
-          int newCount = conf.getCount(p);
-          if (newCount != count) {
-            pool.setCorePoolSize(newCount);
-            pool.setMaximumPoolSize(newCount);
-            count = newCount;
-          }
-        } catch (InterruptedException e) {
-          // throw a RuntimeException and let the 
AccumuloUncaughtExceptionHandler deal with it.
-          throw new RuntimeException("Thread " + threadName + " was 
interrupted.");
-        }
-      }
-    }).start();
-
+  public static void makeResizeable(final ThreadPoolExecutor pool, final 
AccumuloConfiguration conf,

Review comment:
       This method name seems off w/ the changes.
   ```suggestion
     public static void resizePool(final ThreadPoolExecutor pool, final 
AccumuloConfiguration conf,
   ```

##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -226,11 +223,17 @@ public Fate(T environment, TStore<T> store, 
Function<Repo<T>,String> toLogStrFun
    * Launches the specified number of worker threads.
    */
   public void startTransactionRunners(AccumuloConfiguration conf) {
-    int numThreads = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-    executor = ThreadPools.createExecutorService(conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-    for (int i = 0; i < numThreads; i++) {
-      executor.execute(new TransactionRunner());
-    }
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) 
ThreadPools.createExecutorService(conf,
+        Property.MANAGER_FATE_THREADPOOL_SIZE);
+    fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);

Review comment:
       The Accumulo servers used to have a shared timer pool.  Not sure if that 
is still a thing, if it is could be used to avoid creating these watcher 
threads.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -29,33 +29,30 @@
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ThreadPools {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
+
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
-  private static void makeResizeable(final ThreadPoolExecutor pool,
-      final AccumuloConfiguration conf, final Property p) {
-    final String threadName = p.name().concat("_watcher");
-    Threads.createThread(threadName, () -> {
-      int count = conf.getCount(p);
-      while (Thread.currentThread().isAlive() && 
!Thread.currentThread().isInterrupted()) {
-        try {
-          Thread.sleep(1000);
-          int newCount = conf.getCount(p);
-          if (newCount != count) {
-            pool.setCorePoolSize(newCount);
-            pool.setMaximumPoolSize(newCount);
-            count = newCount;
-          }
-        } catch (InterruptedException e) {
-          // throw a RuntimeException and let the 
AccumuloUncaughtExceptionHandler deal with it.
-          throw new RuntimeException("Thread " + threadName + " was 
interrupted.");
-        }
-      }
-    }).start();
-
+  public static void makeResizeable(final ThreadPoolExecutor pool, final 
AccumuloConfiguration conf,
+      final Property p) {
+    int count = pool.getMaximumPoolSize();
+    int newCount = conf.getCount(p);

Review comment:
       Thinking the following may avoid logging when there is nothing to do.
   
   ```suggestion
       int newCount = conf.getCount(p);
       if(count == newCount) 
          return;
   ```

##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -226,11 +223,17 @@ public Fate(T environment, TStore<T> store, 
Function<Repo<T>,String> toLogStrFun
    * Launches the specified number of worker threads.
    */
   public void startTransactionRunners(AccumuloConfiguration conf) {
-    int numThreads = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-    executor = ThreadPools.createExecutorService(conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-    for (int i = 0; i < numThreads; i++) {
-      executor.execute(new TransactionRunner());
-    }
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) 
ThreadPools.createExecutorService(conf,
+        Property.MANAGER_FATE_THREADPOOL_SIZE);
+    fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
+    fatePoolWatcher.schedule(() -> {
+      ThreadPools.makeResizeable(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
+      int remaining = 1000 - pool.getQueue().size();
+      for (int i = 0; i < remaining; i++) {
+        pool.execute(new TransactionRunner());

Review comment:
       This may throw a bunch of exceptions if the pool is shutdown.  Would be 
nice to avoid that.  Could wait for `fatePoolWatcher` to shutdown before 
calling `executor.shutdown()` in the shutdown() method below in this class.  
That might avoid the exceptions here.
   
   Alternatively could catch rejectedexecutionexception here and ignore it if 
the pool is shutdown.

##########
File path: core/src/main/java/org/apache/accumulo/fate/Fate.java
##########
@@ -226,11 +223,17 @@ public Fate(T environment, TStore<T> store, 
Function<Repo<T>,String> toLogStrFun
    * Launches the specified number of worker threads.
    */
   public void startTransactionRunners(AccumuloConfiguration conf) {
-    int numThreads = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-    executor = ThreadPools.createExecutorService(conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-    for (int i = 0; i < numThreads; i++) {
-      executor.execute(new TransactionRunner());
-    }
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) 
ThreadPools.createExecutorService(conf,
+        Property.MANAGER_FATE_THREADPOOL_SIZE);
+    fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
+    fatePoolWatcher.schedule(() -> {
+      ThreadPools.makeResizeable(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
+      int remaining = 1000 - pool.getQueue().size();

Review comment:
       Could replace 1000 w/ a function of the numThreads and time like below.
   
   ```suggestion
         // Assume a thread could execute up to 100 operations a second.  We 
are sleeping for three seconds. So to calculate max to queue do 3* 100 * 
numThreads. 
         int maxToQueue = 300 * conf.getCount(  
Property.MANAGER_FATE_THREADPOOL_SIZE);
         int remaining = 1000 - pool.getQueue().size();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to