kevinrr888 commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1941675846
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -256,21 +256,24 @@ protected Stream<FateIdStatus>
getTransactions(EnumSet<TStatus> statuses) {
RowFateStatusFilter.configureScanner(scanner, statuses);
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
TxColumnFamily.RESERVATION_COLUMN.fetch(scanner);
+ TxInfoColumnFamily.TX_NAME_COLUMN.fetch(scanner);
return scanner.stream().onClose(scanner::close).map(e -> {
String txUUIDStr = e.getKey().getRow().toString();
FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
SortedMap<Key,Value> rowMap;
TStatus status = TStatus.UNKNOWN;
FateReservation reservation = null;
+ Fate.FateOperation fateOp = null;
Review Comment:
My impl includes rename of `txName` to `fateOp` in the places where I write
new code. Ignore the discrepancy in naming: these are the same and will be
consolidated in a later PR via https://github.com/apache/accumulo/issues/5230.
##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -249,30 +249,32 @@ private void
testMultipleFateInstances(TestStoreFactory<SleepingTestEnv> testSto
Fate<SleepingTestEnv> fate2 =
new Fate<>(testEnv2, store2, false, Object::toString,
DefaultConfiguration.getInstance());
- for (int i = 0; i < numFateIds; i++) {
- FateId fateId;
- // Start half the txns using fate1, and the other half using fate2
- if (i % 2 == 0) {
- fateId = fate1.startTransaction();
- fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(),
true, "test");
- } else {
- fateId = fate2.startTransaction();
- fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(),
true, "test");
+ try {
Review Comment:
I made shutdown() in a finally block for the tests in this class to avoid
failures in one test affecting the others. Should have already existed in the
test.
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -144,260 +140,175 @@ public TFateOperation toThrift() {
}
return top;
}
+
+ public static Set<FateOperation> getAllUserFateOps() {
+ return allUserFateOps;
+ }
+
+ public static Set<FateOperation> getAllMetaFateOps() {
+ return allMetaFateOps;
+ }
}
- /**
- * A single thread that finds transactions to work on and queues them up. Do
not want each worker
- * thread going to the store and looking for work as it would place more
load on the store.
- */
- private class WorkFinder implements Runnable {
+ // The fate pools watcher:
+ // - Maintains a TransactionRunner per available thread per
pool/FateExecutor. Does so by
+ // periodically checking the pools for an inactive thread (i.e., a thread
running a
+ // TransactionRunner died or the pool size was increased in the property),
resizing the pool and
+ // submitting new runners as needed. Also safely stops the necessary number
of TransactionRunners
+ // if the pool size in the property was decreased.
+ // - Warns the user to consider increasing the pool size (or splitting the
fate ops assigned to
+ // that pool into separate pools) for any pool that does not often have any
idle threads.
+ private class FatePoolsWatcher implements Runnable {
+ private final T environment;
+ private final AccumuloConfiguration conf;
+
+ private FatePoolsWatcher(T environment, AccumuloConfiguration conf) {
+ this.environment = environment;
+ this.conf = conf;
+ }
@Override
public void run() {
- while (keepRunning.get()) {
- try {
- store.runnable(keepRunning, fateId -> {
- while (keepRunning.get()) {
- try {
- // The reason for calling transfer instead of queueing is
avoid rescanning the
- // storage layer and adding the same thing over and over. For
example if all threads
- // were busy, the queue size was 100, and there are three
runnable things in the
- // store. Do not want to keep scanning the store adding those
same 3 runnable things
- // until the queue is full.
- if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) {
- break;
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
+ // Read from the config here and here only. Must avoid reading the same
property from the
+ // config more than once since it can change at any point in this
execution
+ var poolConfigs = getPoolConfigurations(conf);
+ var idleCheckIntervalMillis =
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
+
+ // shutdown task: shutdown fate executors whose set of fate operations
are no longer present
+ // in the config
+ synchronized (fateExecutors) {
+ final var fateExecutorsIter = fateExecutors.iterator();
+ while (fateExecutorsIter.hasNext()) {
+ var fateExecutor = fateExecutorsIter.next();
+
+ // if this fate executors set of fate ops is no longer present in
the config...
+ if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
+ if (!fateExecutor.isShutdown()) {
+ log.debug("The config for {} has changed invalidating {}.
Gracefully shutting down "
+ + "the FateExecutor.", getFateConfigProp(), fateExecutor);
+ fateExecutor.initiateShutdown();
+ } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
+ log.debug("{} has been shutdown, but is still actively working
on transactions.",
+ fateExecutor);
+ } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) {
+ log.debug("{} has been shutdown and all threads have safely
terminated.",
+ fateExecutor);
+ fateExecutorsIter.remove();
}
- });
- } catch (Exception e) {
- if (keepRunning.get()) {
- log.warn("Failure while attempting to find work for fate", e);
- } else {
- log.debug("Failure while attempting to find work for fate", e);
}
-
- workQueue.clear();
}
}
- }
- }
-
- private class TransactionRunner implements Runnable {
- // used to signal a TransactionRunner to stop in the case where there are
too many running
- // i.e., the property for the pool size decreased and we have excess
TransactionRunners
- private final AtomicBoolean stop = new AtomicBoolean(false);
- private Optional<FateTxStore<T>> reserveFateTx() throws
InterruptedException {
- while (keepRunning.get() && !stop.get()) {
- FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
-
- if (unreservedFateId == null) {
- continue;
- }
- var optionalopStore = store.tryReserve(unreservedFateId);
- if (optionalopStore.isPresent()) {
- return optionalopStore;
+ // replacement task: at this point, the existing FateExecutors that were
invalidated by the
+ // config changes have started shutdown or finished shutdown. Now create
any new replacement
+ // FateExecutors needed
+ for (var poolConfig : poolConfigs.entrySet()) {
+ var configFateOps = poolConfig.getKey();
+ var configPoolSize = poolConfig.getValue();
+ synchronized (fateExecutors) {
+ if (fateExecutors.stream().map(FateExecutor::getFateOps)
+ .noneMatch(fo -> fo.equals(configFateOps))) {
+ fateExecutors
+ .add(new FateExecutor<>(Fate.this, environment, configFateOps,
configPoolSize));
+ }
}
}
- return Optional.empty();
- }
-
- @Override
- public void run() {
- runningTxRunners.add(this);
- try {
- while (keepRunning.get() && !stop.get()) {
- FateTxStore<T> txStore = null;
- ExecutionState state = new ExecutionState();
- try {
- var optionalopStore = reserveFateTx();
- if (optionalopStore.isPresent()) {
- txStore = optionalopStore.orElseThrow();
- } else {
- continue;
- }
- state.status = txStore.getStatus();
- state.op = txStore.top();
- if (state.status == FAILED_IN_PROGRESS) {
- processFailed(txStore, state.op);
- } else if (state.status == SUBMITTED || state.status ==
IN_PROGRESS) {
+ // resize task: For each fate executor, resize the pool to match the
config as necessary and
+ // submit new TransactionRunners if the pool grew, stop
TransactionRunners if the pool
+ // shrunk, and potentially suggest resizing the pool if the load is
consistently high.
+ synchronized (fateExecutors) {
+ for (var fateExecutor : fateExecutors) {
+ if (fateExecutor.isShutdown()) {
+ continue;
+ }
+ final var pool = fateExecutor.getTransactionExecutor();
+ final var poolName = fateExecutor.getPoolName();
+ final var runningTxRunners = fateExecutor.getRunningTxRunners();
+ final int configured = poolConfigs.get(fateExecutor.getFateOps());
+ ThreadPools.resizePool(pool, () -> configured, poolName);
Review Comment:
I have not yet explored changing this into a cached thread pool (discussed
in https://github.com/apache/accumulo/pull/5263#discussion_r1917457670 thread).
The changes were already pretty large and didn't want to introduce more.
Writing this as a reminder to do as a potential follow on.
##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
+import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TransferQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.util.ShutdownUtil;
+import org.apache.accumulo.core.util.Timer;
+import org.apache.accumulo.core.util.threads.ThreadPoolNames;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles finding and working on FATE work. Only finds/works on fate
operations that it is assigned
+ * to work on defined by 'fateOps'
+ */
+public class FateExecutor<T> {
+ private static final Logger log =
LoggerFactory.getLogger(FateExecutor.class);
+ private final Logger runnerLog =
LoggerFactory.getLogger(TransactionRunner.class);
+
+ private final T environment;
+ private final Fate<T> fate;
+ private final Thread workFinder;
+ private final TransferQueue<FateId> workQueue;
+ private final String poolName;
+ private final ThreadPoolExecutor transactionExecutor;
+ private final Set<TransactionRunner> runningTxRunners;
+ private final Set<Fate.FateOperation> fateOps;
+ private final ConcurrentLinkedQueue<Integer> idleCountHistory = new
ConcurrentLinkedQueue<>();
+
+ public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation>
fateOps, int poolSize) {
+ final String operatesOn = fate.getStore().type().name().toLowerCase() + "."
+ + fateOps.stream().map(fo ->
fo.name().toLowerCase()).collect(Collectors.joining("."));
+ final String transactionRunnerPoolName =
+ ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + operatesOn;
+ final String workFinderThreadName = "fate.work.finder." + operatesOn;
Review Comment:
Example pool name:
`accumulo.pool.manager.fate.meta.table_split.system_split.table_delete_range.table_merge`
This is long, but descriptive
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -429,15 +429,34 @@ public enum Property {
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
"60s",
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
- MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
PropertyType.COUNT,
- "The number of threads used to run fault-tolerant executions (FATE)."
- + " These are primarily table operations like merge.",
- "1.4.3"),
+ MANAGER_USER_FATE_CONFIG("manager.user.fate.config", "{"
+ +
"\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER\":
1,"
+ + "\"TABLE_BULK_IMPORT2\": 2,"
+ + "\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+ +
"\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT,TABLE_CLONE,TABLE_IMPORT,TABLE_EXPORT\":
2"
+ + "}", PropertyType.USER_FATE_CONFIG,
+ "The number of threads used to run user-initiated fault-tolerant "
+ + "executions (FATE). These are primarily table operations like
merge. Each key/value "
+ + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
+ + "more FATE operations and each value is the number of threads that
will be assigned "
+ + "to the pool.",
+ "4.0.0"),
+ MANAGER_META_FATE_CONFIG("manager.meta.fate.config",
+ "{\"TABLE_COMPACT,TABLE_CANCEL_COMPACT,COMMIT_COMPACTION\": 4,"
+ + "\"TABLE_MERGE,TABLE_DELETE_RANGE,TABLE_SPLIT,SYSTEM_SPLIT\": 2}",
+ PropertyType.META_FATE_CONFIG,
+ "The number of threads used to run system-initiated fault-tolerant "
+ + "executions (FATE). These are primarily table operations like
merge. Each key/value "
+ + "of the provided JSON corresponds to one thread pool. Each key is
a list of one or "
+ + "more FATE operations and each value is the number of threads that
will be assigned "
+ + "to the pool.",
+ "4.0.0"),
Review Comment:
Several comments here:
- This is my best attempt as to what fate ops are valid META fate ops. I may
have excluded some that should have been included or vice-versa.
- What is the proper way to deprecate the old MANAGER_FATE_THREADPOOL_SIZE
property
- These default pool sizes and default pools probably aren't optimal. Input
into how the FATE ops should be split and how many threads for each pool would
be appreciated. This was just my best attempt
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]