dlmarion commented on a change in pull request #2346:
URL: https://github.com/apache/accumulo/pull/2346#discussion_r745094092
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -386,15 +386,12 @@ public void run() {
fatalException = new TableDeletedException(tableId.canonical());
} catch (SampleNotPresentException e) {
fatalException = e;
- } catch (Exception t) {
+ } catch (Throwable t) {
if (queryThreadPool.isShutdown())
log.debug("Caught exception, but queryThreadPool is shutdown", t);
else
log.warn("Caught exception, but queryThreadPool is not shutdown", t);
fatalException = t;
- } catch (Throwable t) {
- fatalException = t;
- throw t; // let uncaught exception handler deal with the Error
Review comment:
The user is not providing their own thread pools in this PR. I will
review and see what happens if an exception is thrown from this iterator
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -625,19 +641,26 @@ public void run() {
private static final int MUTATION_BATCH_SIZE = 1 << 17;
private final ThreadPoolExecutor sendThreadPool;
+ private final Cleanable sendThreadPoolCleanable;
private final ThreadPoolExecutor binningThreadPool;
+ private final Cleanable binningThreadPoolCleanable;
private final Map<String,TabletServerMutations<Mutation>> serversMutations;
private final Set<String> queued;
private final Map<TableId,TabletLocator> locators;
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<>();
queued = new HashSet<>();
- sendThreadPool =
- ThreadPools.createFixedThreadPool(numSendThreads,
this.getClass().getName(), false);
+ sendThreadPool =
context.getThreadPools().newThreadPool(ThreadPoolType.BATCH_WRITER_SEND_POOL,
+ new ThreadPoolConfig(context.getConfiguration(), numSendThreads));
+ sendThreadPoolCleanable =
+ CleanerUtil.shutdownThreadPoolExecutor(sendThreadPool, () -> {},
log);
Review comment:
Take a look to at the changes I made to CleanerUtil below. The empty
runnable is a pre-shutdown function. There is one place in the code where that
is being used.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
@Override
public void close() {
threadPool.shutdownNow();
+ threadPoolCleanable.clean(); // deregister the cleaner, will not call
shutdownNow() because
+ // closed is now true
cleanupThreadPool.execute(Threads.createNamedRunnable("ConditionalWriterCleanupTask",
Review comment:
I could potentially modify it to use the
SHARED_GENERAL_SCHEDULED_TASK_POOL which is static. I could submit a fire once
task.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -411,7 +408,7 @@ public void run() {
e.setTableInfo(getTableInfo());
log.debug("{}", e.getMessage(), e);
fatalException = e;
- } catch (Exception t) {
+ } catch (Throwable t) {
Review comment:
I went back through the changes I made to the client code in #1840 and
un-did them.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -65,9 +67,9 @@
private ScannerImpl.Reporter reporter;
- private static ThreadPoolExecutor readaheadPool =
ThreadPools.createThreadPool(0,
Review comment:
If you notice here the max threads is Integer.MAX_VALUE. Now that this
is not static anymore, what should the max value be?
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -124,20 +133,26 @@ public boolean hasNext() {
throw new NoSuchElementException();
}
+ void closeThriftScanner() {
+ synchronized (scanState) {
+ // this is synchronized so its mutually exclusive with readBatch()
+ try {
+ closed = true;
+ ThriftScanner.close(scanState);
+ } catch (Exception e) {
+ LoggerFactory.getLogger(ScannerIterator.class).debug("Exception when
closing scan session",
+ e);
+ }
+ }
+ }
+
void close() {
// run actual close operation in the background so this does not block.
readaheadPool.execute(() -> {
- synchronized (scanState) {
- // this is synchronized so its mutually exclusive with readBatch()
- try {
- closed = true;
- ThriftScanner.close(scanState);
- } catch (Exception e) {
- LoggerFactory.getLogger(ScannerIterator.class)
- .debug("Exception when closing scan session", e);
- }
- }
+ closeThriftScanner();
Review comment:
poolCloser is a reference to the shared scheduled executor pool
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
@Override
public void close() {
threadPool.shutdownNow();
+ threadPoolCleanable.clean(); // deregister the cleaner, will not call
shutdownNow() because
+ // closed is now true
cleanupThreadPool.execute(Threads.createNamedRunnable("ConditionalWriterCleanupTask",
Review comment:
Addressed in 918b20f. CleanupThreadPool is now a reference to the shared
scheduled executor pool.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
@Override
public void close() {
threadPool.shutdownNow();
+ threadPoolCleanable.clean(); // deregister the cleaner, will not call
shutdownNow() because
+ // closed is now true
Review comment:
See the changes I made to CleanerUtil below. The empty runnable is a
preshutdown function, which is not needed here. It still shuts down the
threadpool.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -65,9 +67,9 @@
private ScannerImpl.Reporter reporter;
- private static ThreadPoolExecutor readaheadPool =
ThreadPools.createThreadPool(0,
Review comment:
I was surprised by that also, although the max is rather large.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]