ctubbsii commented on a change in pull request #2346:
URL: https://github.com/apache/accumulo/pull/2346#discussion_r745000366
##########
File path:
core/src/test/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderTest.java
##########
@@ -34,7 +35,11 @@
@Before
public void setup() {
- context = EasyMock.createMock(ClientContext.class);
+ context = EasyMock.createStrictMock(ClientContext.class);
+ AccumuloConfiguration conf =
EasyMock.createStrictMock(AccumuloConfiguration.class);
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.expect(context.getThreadPools()).andReturn(new
ClientThreadPools()).anyTimes();
+ EasyMock.replay(context);
Review comment:
EasyMock methods are good candidates for using static imports, just
because they make the tests slightly more readable without losing anything.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
##########
@@ -716,6 +718,9 @@ public void close() {
thriftTransportPool.shutdown();
}
singletonReservation.close();
+ if (threadPools != null) {
+ threadPools.close();
+ }
Review comment:
`threadPools` is `final` and set explicitly to a new object in the
constructor, so it can never be null.
```suggestion
threadPools.close();
```
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -703,7 +726,7 @@ void queueMutations(final MutationSet mutationsToSend) {
log.trace("{} - binning {} mutations",
Thread.currentThread().getName(),
mutationsToSend.size());
addMutations(mutationsToSend);
- } catch (Exception e) {
+ } catch (Throwable e) {
updateUnknownErrors("Error processing mutation set", e);
Review comment:
Catching Throwable here seems counterproductive to the goal of this...
which I understand is to give control to the calling code that provides the
thread pool when an error occurs. We can't do that if we're swallowing the
errors instead of letting the user specify their own uncaught exception handler
on their thread pool.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
@Override
public void close() {
threadPool.shutdownNow();
+ threadPoolCleanable.clean(); // deregister the cleaner, will not call
shutdownNow() because
+ // closed is now true
Review comment:
The comment does not appear to be correct. The cleaner will not call
shutdownNow because the closed is now true, it will not call shutdownNow
because it was passed a NOOP runnable. It's not passed any kind of Closeable to
be able to detect whether closed is true now or not. It's doing nothing solely
because it was only given a NOOP runnable.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -513,10 +522,17 @@ private synchronized void updateServerErrors(String
server, Exception e) {
log.error("Server side error on {}", server, e);
}
- private synchronized void updateUnknownErrors(String msg, Exception t) {
+ private synchronized void updateUnknownErrors(String msg, Throwable t) {
somethingFailed = true;
unknownErrors++;
- this.lastUnknownError = t;
+ // Multiple errors may occur between the time checkForFailures() is called
+ // by the client. Be sure to return an Error if one (or more) occurred.
+ // Set lastUnknownError if it's null, to an Error, or to an Exception if
it's not already an
+ // Error
+ if (this.lastUnknownError == null
+ || !(t instanceof Exception && this.lastUnknownError instanceof
Error)) {
Review comment:
Is this trying to store the most severe of the problems? (Error
preferred, then Exception, then null)?
It's probably still a good idea to keep the old problems using
`addSuppressed`.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientThreadPools.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+public class ClientThreadPools {
+
+ public static class ThreadPoolConfig {
Review comment:
I like this class being consolidated, for the most part, but it's not
clear how having this consolidated here allows users to be able to have greater
control over the thread pool lifecycles/error handling, which is what I thought
was part of the goal. I don't see it exposed in the public API anywhere.
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -625,19 +641,26 @@ public void run() {
private static final int MUTATION_BATCH_SIZE = 1 << 17;
private final ThreadPoolExecutor sendThreadPool;
+ private final Cleanable sendThreadPoolCleanable;
private final ThreadPoolExecutor binningThreadPool;
+ private final Cleanable binningThreadPoolCleanable;
private final Map<String,TabletServerMutations<Mutation>> serversMutations;
private final Set<String> queued;
private final Map<TableId,TabletLocator> locators;
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<>();
queued = new HashSet<>();
- sendThreadPool =
- ThreadPools.createFixedThreadPool(numSendThreads,
this.getClass().getName(), false);
+ sendThreadPool =
context.getThreadPools().newThreadPool(ThreadPoolType.BATCH_WRITER_SEND_POOL,
+ new ThreadPoolConfig(context.getConfiguration(), numSendThreads));
+ sendThreadPoolCleanable =
+ CleanerUtil.shutdownThreadPoolExecutor(sendThreadPool, () -> {},
log);
Review comment:
This seems to register a cleaner that does nothing to attempt to
shutdown the thread pool. It seems odd to have a cleaner that doesn't do any
cleaning, and just does a NOOP (`() -> {}`)
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientThreadPools.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+public class ClientThreadPools {
+
+ public static class ThreadPoolConfig {
+
+ public static final ThreadPoolConfig EMPTY_CONFIG =
+ new ThreadPoolConfig(Optional.empty(), Optional.empty(),
Optional.empty());
+
+ private final Optional<Iterable<Entry<String,String>>> configuration;
+ private final Optional<Integer> numThreads;
+ private final Optional<String> threadName;
+
+ public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+ this(Optional.of(configuration), Optional.empty(), Optional.empty());
+ }
+
+ public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int
numThreads) {
+ this(Optional.of(configuration), Optional.of(numThreads),
Optional.empty());
+ }
+
+ public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int
numThreads,
+ String threadName) {
+ this(Optional.of(configuration), Optional.of(numThreads),
Optional.of(threadName));
+ }
+
+ private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>>
configuration,
+ Optional<Integer> numThreads, Optional<String> threadName) {
+ this.configuration = configuration;
+ this.numThreads = numThreads;
+ this.threadName = threadName;
+ }
+
+ public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+ return configuration;
+ }
+
+ public Optional<Integer> getNumThreads() {
+ return numThreads;
+ }
+
+ public Optional<String> getThreadName() {
+ return threadName;
+ }
+ }
+
+ public static enum ThreadPoolType {
+ /**
+ * ThreadPoolExecutor that runs bulk import tasks
+ */
+ BULK_IMPORT_POOL,
+ /**
+ * ThreadPoolExecutor that runs tasks to contact Compactors to get running
compaction
+ * information
+ */
+ ACTIVE_EXTERNAL_COMPACTION_POOL,
+ /**
+ * ThreadPoolExecutor used for fetching data from the TabletServers
+ */
+ SCANNER_READ_AHEAD_POOL,
+ /**
+ * ThreadPoolExecutor used for adding splits to a table
+ */
+ ADD_SPLITS_THREAD_POOL,
+ /**
+ * ThreadPoolExecutor used for fetching data from the TabletServers
+ */
+ BATCH_SCANNER_READ_AHEAD_POOL,
+ /**
+ * ThreadPoolExecutor that runs the tasks of binning mutations
+ */
+ BATCH_WRITER_BINNING_POOL,
+ /**
+ * ThreadPoolExecutor that runs the tasks of sending mutations to
TabletServers
+ */
+ BATCH_WRITER_SEND_POOL,
+ /**
+ * ThreadPoolExecutor that runs clean up tasks when close is called on the
ConditionalWriter
+ */
+ CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+ /**
+ * ThreadPoolExecutor responsible for loading bloom filters
+ */
+ BLOOM_FILTER_LAYER_LOADER_POOL
+ }
+
+ public static enum ScheduledThreadPoolType {
+ /**
+ * shared scheduled executor for trivial tasks
+ */
+ SHARED_GENERAL_SCHEDULED_TASK_POOL,
+ /**
+ * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet
the users latency
+ * goals.
+ */
+ BATCH_WRITER_LATENCY_TASK_POOL,
+ /**
+ * ScheduledThreadPoolExecutor that periodically runs tasks to handle
failed write mutations and
+ * send mutations to TabletServers
+ */
+ CONDITIONAL_WRITER_RETRY_POOL
+ }
+
+ private ScheduledThreadPoolExecutor sharedScheduledThreadPool = null;
+
+ public ThreadPoolExecutor newThreadPool(ThreadPoolType usage,
ThreadPoolConfig config) {
+ switch (usage) {
+ case BULK_IMPORT_POOL:
+ Objects.requireNonNull(config.getNumThreads().get(), "Number of
threads must be set");
Review comment:
Some of these lines might be shorter and read more easily if you
statically import requireNonNull
##########
File path:
core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
// If e == OutOfMemoryError, then it's probably that another Error
might be
// thrown when trying to print to System.err.
} finally {
- Runtime.getRuntime().halt(-1);
+ Mode m = SingletonManager.getMode();
+ if (m != null && m.equals(Mode.SERVER)) {
Review comment:
The mode should never be null. Also, you should use `==` for comparing
enums, not `.equals`
```suggestion
if (SingletonManager.getMode() == Mode.SERVER) {
```
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -386,15 +386,12 @@ public void run() {
fatalException = new TableDeletedException(tableId.canonical());
} catch (SampleNotPresentException e) {
fatalException = e;
- } catch (Exception t) {
+ } catch (Throwable t) {
if (queryThreadPool.isShutdown())
log.debug("Caught exception, but queryThreadPool is shutdown", t);
else
log.warn("Caught exception, but queryThreadPool is not shutdown", t);
fatalException = t;
- } catch (Throwable t) {
- fatalException = t;
- throw t; // let uncaught exception handler deal with the Error
Review comment:
I like the simplicity of this earlier solution. If the user is providing
their own thread pools, why wouldn't they be able to provide their own uncaught
exception handler to receive these and handle them on their own, if they wish?
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -124,20 +133,26 @@ public boolean hasNext() {
throw new NoSuchElementException();
}
+ void closeThriftScanner() {
+ synchronized (scanState) {
+ // this is synchronized so its mutually exclusive with readBatch()
+ try {
+ closed = true;
+ ThriftScanner.close(scanState);
+ } catch (Exception e) {
+ LoggerFactory.getLogger(ScannerIterator.class).debug("Exception when
closing scan session",
+ e);
+ }
+ }
+ }
+
void close() {
// run actual close operation in the background so this does not block.
readaheadPool.execute(() -> {
- synchronized (scanState) {
- // this is synchronized so its mutually exclusive with readBatch()
- try {
- closed = true;
- ThriftScanner.close(scanState);
- } catch (Exception e) {
- LoggerFactory.getLogger(ScannerIterator.class)
- .debug("Exception when closing scan session", e);
- }
- }
+ closeThriftScanner();
});
+ readaheadPoolCleanable.clean();
+ this.poolCloser.execute(() -> readaheadPool.shutdownNow());
Review comment:
Why is a separate thread pool being used to shut down this thread pool
instead of the cleaner doing that?
##########
File path:
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientThreadPools.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+public class ClientThreadPools {
+
+ public static class ThreadPoolConfig {
+
+ public static final ThreadPoolConfig EMPTY_CONFIG =
+ new ThreadPoolConfig(Optional.empty(), Optional.empty(),
Optional.empty());
+
+ private final Optional<Iterable<Entry<String,String>>> configuration;
Review comment:
Because this is an internal class only now, it might be more readable to
just use the internal class, AccumuloConfiguration instead of the Iterable
interface type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]