brianloss commented on a change in pull request #1818:
URL: https://github.com/apache/accumulo/pull/1818#discussion_r555101347



##########
File path: 
core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
##########
@@ -59,18 +60,20 @@ public DefaultContextClassLoaderFactory(final 
AccumuloConfiguration accConf) {
   }
 
   private static void startCleanupThread(final Supplier<Map<String,String>> 
contextConfigSupplier) {
-    new Timer(className + "-cleanup", true).scheduleAtFixedRate(new 
TimerTask() {
-      @Override
-      public void run() {
-        Map<String,String> contextConfigs = contextConfigSupplier.get();
-        LOG.trace("{}-cleanup thread, properties: {}", className, 
contextConfigs);
-        int prefixlen = 
Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length();
-        Set<String> contextsInUse = contextConfigs.keySet().stream()
-            .map(k -> k.substring(prefixlen)).collect(Collectors.toSet());
-        LOG.trace("{}-cleanup thread, contexts in use: {}", className, 
contextsInUse);
-        AccumuloVFSClassLoader.removeUnusedContexts(contextsInUse);
-      }
-    }, 60_000, 60_000);
+    final ConfigurationCopy cc = new 
ConfigurationCopy(contextConfigSupplier.get());
+    String size = cc.get(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+    if (null == size || size.isEmpty()) {
+      cc.set(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE,
+          Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue());
+    }

Review comment:
       It seems like maybe calling 
`ThreadPools.getGeneralScheduledExecutorService` depends on the configuration 
having a value for `Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE`. Should this 
logic be moved into `ThreadPools.getGeneralScheduledExecutorService`?

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -198,6 +197,8 @@ public long getTimeOut() {
 
   public TabletServerBatchWriter(ClientContext context, BatchWriterConfig 
config) {
     this.context = context;
+    this.executor = 
ThreadPools.getGeneralScheduledExecutorService(this.context.getConfiguration());
+    this.failedMutations = new FailedMutations();

Review comment:
       Just curious why the initialization for `failedMutations` was moved into 
the constructor but the other nearby inline final initializations 
(`violations`, `authorizationFailures`, `serverSideErrors`) weren't?

##########
File path: 
core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.threads;
+
+import java.util.OptionalInt;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+
+public class ThreadPools {
+
+  // the number of seconds before we allow a thread to terminate with non-use.
+  public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
+
+  /**
+   * Get a thread pool based on a thread pool related property
+   *
+   * @param conf
+   *          accumulo configuration
+   * @param p
+   *          thread pool related property
+   * @return ExecutorService impl
+   * @throws RuntimeException
+   *           if property is not handled
+   */
+  public static ExecutorService getExecutorService(AccumuloConfiguration conf, 
Property p) {

Review comment:
       Not a super strong opinion, but when I first started reading the code I 
thought maybe these methods could be returning cached/shared executors. I 
suggest considering changing all of the methods in this class from "get*" to 
"create*" (or "new*" to match the pattern in the 
`java.util.concurrent.Executors` class) to make it explicit that they are 
always creating a resource.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -16,20 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.util;
+package org.apache.accumulo.core.util.threads;
 
 import java.lang.Thread.UncaughtExceptionHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AccumuloUncaughtExceptionHandler implements 
UncaughtExceptionHandler {
+/**
+ * UncaughtExceptionHandler that logs all Exceptions and Errors thrown from a 
Thread. If an Error is
+ * thrown, halt the JVM.
+ *
+ */
+class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
 
-  private static final Logger log = 
LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
 
   @Override
   public void uncaughtException(Thread t, Throwable e) {
-    log.error(String.format("Caught an exception in %s.  Shutting down.", t), 
e);
+    if (e instanceof Exception) {
+      LOG.error("Caught an Exception in {}. Thread is dead.", t, e);
+    } else if (e instanceof Error) {
+      try {
+        e.printStackTrace();
+        System.err.println("Error thrown in thread: " + t + ", halting VM.");
+      } catch (Throwable e1) {
+        // If e == OutOfMemoryError, then it's probably that another Error 
might be
+        // thrown when trying to print to System.err.
+      } finally {
+        Runtime.getRuntime().halt(-1);

Review comment:
       From reading the discussion here, it sounds like there's some 
uncertainty of what action to take if we get here. Did Ivan or Drew ever weigh 
in again? I think Christopher's arguments make sense, but am just wondering if 
it's worth adding configuration to skip halting the tserver? The main reason I 
suggest that is if there's some unanticipated case where this is hit and it has 
a big impact on production systems. At least in such a case it could be 
disabled with a configuration change vs a re-release and deployment of 
Accumulo. I only suggest that given the uncertainty of whether or not this is 
too severe of a response to uncaught errors.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -209,20 +210,17 @@ public TabletServerBatchWriter(ClientContext context, 
BatchWriterConfig config)
     this.writer = new MutationWriter(config.getMaxWriteThreads());
 
     if (this.maxLatency != Long.MAX_VALUE) {
-      jtimer.schedule(new TimerTask() {
-        @Override
-        public void run() {
-          try {
-            synchronized (TabletServerBatchWriter.this) {
-              if ((System.currentTimeMillis() - lastProcessingStartTime)
-                  > TabletServerBatchWriter.this.maxLatency)
-                startProcessing();
-            }
-          } catch (Throwable t) {
-            updateUnknownErrors("Max latency task failed " + t.getMessage(), 
t);
+      executor.scheduleWithFixedDelay(Threads.createNamedRunnable("latency 
timer", () -> {

Review comment:
       I like the fact that previously the timer thread would have 
`BatchWriter` in its name. Only `latency timer` makes it just a little harder 
to figure out when looking at a thread dump, IMO.
   ```suggestion
         
executor.scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer",
 () -> {
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.threads;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.OptionalInt;
+
+public class Threads {
+
+  public static Runnable createNamedRunnable(String name, Runnable r) {
+    return new NamedRunnable(name, r);
+  }
+
+  public static Runnable createNamedRunnable(String name, OptionalInt 
priority, Runnable r) {
+    return new NamedRunnable(name, priority, r);
+  }
+
+  private static final UncaughtExceptionHandler UEH = new 
AccumuloUncaughtExceptionHandler();
+
+  public static Thread createThread(String name, Runnable r) {
+    return createThread(name, OptionalInt.empty(), r);
+  }
+
+  public static Thread createThread(String name, OptionalInt priority, 
Runnable r) {
+    Thread thread = null;
+    if (r instanceof NamedRunnable) {
+      NamedRunnable nr = (NamedRunnable) r;
+      thread = new Thread(r, name);
+      if (nr.getPriority().isPresent()) {
+        thread.setPriority(nr.getPriority().getAsInt());
+      } else if (priority.isPresent()) {
+        thread.setPriority(priority.getAsInt());
+      }
+    } else {
+      thread = new Thread(r, name);
+      if (priority.isPresent()) {
+        thread.setPriority(priority.getAsInt());
+      }
+    }

Review comment:
       Just a nit, but it doesn't appear that the initialization of `thread` is 
any different based on the type of Runnable passed in. Why not move the 
initialization up? Maybe something like:
   ```suggestion
       Thread thread = new Thread(r, name);
       if (r instanceof NamedRunnable) {
         NamedRunnable nr = (NamedRunnable) r;
         if (nr.getPriority().isPresent()) {
           priority = nr.getPriority();
         }
       }
       if (priority.isPresent()) {
         thread.setPriority(priority.getAsInt());
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to