dlmarion commented on a change in pull request #2346:
URL: https://github.com/apache/accumulo/pull/2346#discussion_r745094092



##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -386,15 +386,12 @@ public void run() {
           fatalException = new TableDeletedException(tableId.canonical());
       } catch (SampleNotPresentException e) {
         fatalException = e;
-      } catch (Exception t) {
+      } catch (Throwable t) {
         if (queryThreadPool.isShutdown())
           log.debug("Caught exception, but queryThreadPool is shutdown", t);
         else
           log.warn("Caught exception, but queryThreadPool is not shutdown", t);
         fatalException = t;
-      } catch (Throwable t) {
-        fatalException = t;
-        throw t; // let uncaught exception handler deal with the Error

Review comment:
       The user is not providing their own thread pools in this PR. I will 
review and see what happens if an exception is thrown from this iterator

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -625,19 +641,26 @@ public void run() {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private final ThreadPoolExecutor sendThreadPool;
+    private final Cleanable sendThreadPoolCleanable;
     private final ThreadPoolExecutor binningThreadPool;
+    private final Cleanable binningThreadPoolCleanable;
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<TableId,TabletLocator> locators;
 
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<>();
       queued = new HashSet<>();
-      sendThreadPool =
-          ThreadPools.createFixedThreadPool(numSendThreads, 
this.getClass().getName(), false);
+      sendThreadPool = 
context.getThreadPools().newThreadPool(ThreadPoolType.BATCH_WRITER_SEND_POOL,
+          new ThreadPoolConfig(context.getConfiguration(), numSendThreads));
+      sendThreadPoolCleanable =
+          CleanerUtil.shutdownThreadPoolExecutor(sendThreadPool, () -> {}, 
log);

Review comment:
       Take a look to at the changes I made to CleanerUtil below. The empty 
runnable is a pre-shutdown function. There is one place in the code where that 
is being used.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
   @Override
   public void close() {
     threadPool.shutdownNow();
+    threadPoolCleanable.clean(); // deregister the cleaner, will not call 
shutdownNow() because
+                                 // closed is now true
     
cleanupThreadPool.execute(Threads.createNamedRunnable("ConditionalWriterCleanupTask",

Review comment:
       I could potentially modify it to use the 
SHARED_GENERAL_SCHEDULED_TASK_POOL which is static. I could submit a fire once 
task.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -411,7 +408,7 @@ public void run() {
               e.setTableInfo(getTableInfo());
               log.debug("{}", e.getMessage(), e);
               fatalException = e;
-            } catch (Exception t) {
+            } catch (Throwable t) {

Review comment:
       I went back through the changes I made to the client code in #1840 and 
un-did them.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -65,9 +67,9 @@
 
   private ScannerImpl.Reporter reporter;
 
-  private static ThreadPoolExecutor readaheadPool = 
ThreadPools.createThreadPool(0,

Review comment:
       If you notice here the max threads is Integer.MAX_VALUE. Now that this 
is not static anymore, what should the max value be? 

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -124,20 +133,26 @@ public boolean hasNext() {
     throw new NoSuchElementException();
   }
 
+  void closeThriftScanner() {
+    synchronized (scanState) {
+      // this is synchronized so its mutually exclusive with readBatch()
+      try {
+        closed = true;
+        ThriftScanner.close(scanState);
+      } catch (Exception e) {
+        LoggerFactory.getLogger(ScannerIterator.class).debug("Exception when 
closing scan session",
+            e);
+      }
+    }
+  }
+
   void close() {
     // run actual close operation in the background so this does not block.
     readaheadPool.execute(() -> {
-      synchronized (scanState) {
-        // this is synchronized so its mutually exclusive with readBatch()
-        try {
-          closed = true;
-          ThriftScanner.close(scanState);
-        } catch (Exception e) {
-          LoggerFactory.getLogger(ScannerIterator.class)
-              .debug("Exception when closing scan session", e);
-        }
-      }
+      closeThriftScanner();

Review comment:
       poolCloser is a reference to the shared scheduled executor pool

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
   @Override
   public void close() {
     threadPool.shutdownNow();
+    threadPoolCleanable.clean(); // deregister the cleaner, will not call 
shutdownNow() because
+                                 // closed is now true
     
cleanupThreadPool.execute(Threads.createNamedRunnable("ConditionalWriterCleanupTask",

Review comment:
       Addressed in 918b20f. CleanupThreadPool is now a reference to the shared 
scheduled executor pool.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
   @Override
   public void close() {
     threadPool.shutdownNow();
+    threadPoolCleanable.clean(); // deregister the cleaner, will not call 
shutdownNow() because
+                                 // closed is now true

Review comment:
       See the changes I made to CleanerUtil below. The empty runnable is a 
preshutdown function, which is not needed here. It still shuts down the 
threadpool.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -65,9 +67,9 @@
 
   private ScannerImpl.Reporter reporter;
 
-  private static ThreadPoolExecutor readaheadPool = 
ThreadPools.createThreadPool(0,

Review comment:
       I was surprised by that also, although the max is rather large.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to