keith-turner commented on code in PR #5301: URL: https://github.com/apache/accumulo/pull/5301#discussion_r1956684831
########## test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherIT.java: ########## @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.util.Wait; +import org.junit.jupiter.api.Test; + +/** + * Tests the functionality of the FATE pools watcher task + */ +public abstract class FatePoolsWatcherIT extends SharedMiniClusterBase Review Comment: These are really nice test. Not sure if its covered, another test that may be useful is one that ensures things are being partitioned correctly among the pools. The following is a rough stab at doing this. 1. Create FateExectuor_1 w/ 2 threads 2. Create FateExectuor_2 w/ 3 threads 3. Create FateExectuor_3 w/ 4 threads 4. Create FateExectuor_4 w/ 5 threads 5. Seed 5 things to run on FateExectuor_1 6. Seed 6 things to run on FateExectuor_2 7. Seed 1 thing to run on FateExectuor_3 8. Seed 4 things to run on FateExectuor_4 Should eventually see 2 running on FateExectuor_1, 3 running on FateExectuor_2, 1 running on FateExectuor_3, and 4 running on FateExectuor_4. So for each executor should see the `min(configuredThreads, seededTask)` actually running. Giving some executors more work than they have threads and some less at the same time. Hoping this would pick up on some cases where an executor snags some work its not supposed to. Tried to pick number that made the inputs to min() unique and the result of min() unique. The existing test seem to create the exact number of task as threads in most cases. When the existing test create more task that threads seems like its only for one executor. Was attempting to have multiple executors w/ different threads and different numbers of task that differ from the number of threads in for the target executor. ########## core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java: ########## @@ -219,4 +220,83 @@ public void testTypeFILENAME_EXT() { invalid(null, "RF", "map", "", "MAP", "rF", "Rf", " rf "); } + @Test + public void testTypeFATE_USER_CONFIG() { + var allUserFateOps = Fate.FateOperation.getAllUserFateOps(); + int poolSize1 = allUserFateOps.size() / 2; + var validPool1Ops = + allUserFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(",")); + var validPool2Ops = + allUserFateOps.stream().map(Enum::name).skip(poolSize1).collect(Collectors.joining(",")); + var invalidPool1Ops = + allUserFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(",")); + var invalidPool2Ops = allUserFateOps.stream().map(Enum::name).skip(poolSize1 + 1) + .collect(Collectors.joining(",")); Review Comment: Could move the declaration of these down prior to the `invalid()` call. This is a good test, buts its hard to follow w/ all the json. That move would help a tiny bit w/ readability. Feel free to ignore this comment, just a minor style thing. ########## core/src/main/java/org/apache/accumulo/core/fate/Fate.java: ########## @@ -144,260 +141,175 @@ public TFateOperation toThrift() { } return top; } + + public static Set<FateOperation> getAllUserFateOps() { + return allUserFateOps; + } + + public static Set<FateOperation> getAllMetaFateOps() { + return allMetaFateOps; + } } - /** - * A single thread that finds transactions to work on and queues them up. Do not want each worker - * thread going to the store and looking for work as it would place more load on the store. - */ - private class WorkFinder implements Runnable { + // The fate pools watcher: + // - Maintains a TransactionRunner per available thread per pool/FateExecutor. Does so by + // periodically checking the pools for an inactive thread (i.e., a thread running a + // TransactionRunner died or the pool size was increased in the property), resizing the pool and + // submitting new runners as needed. Also safely stops the necessary number of TransactionRunners + // if the pool size in the property was decreased. + // - Warns the user to consider increasing the pool size (or splitting the fate ops assigned to + // that pool into separate pools) for any pool that does not often have any idle threads. + private class FatePoolsWatcher implements Runnable { + private final T environment; + private final AccumuloConfiguration conf; + + private FatePoolsWatcher(T environment, AccumuloConfiguration conf) { + this.environment = environment; + this.conf = conf; + } @Override public void run() { - while (keepRunning.get()) { - try { - store.runnable(keepRunning, fateId -> { - while (keepRunning.get()) { - try { - // The reason for calling transfer instead of queueing is avoid rescanning the - // storage layer and adding the same thing over and over. For example if all threads - // were busy, the queue size was 100, and there are three runnable things in the - // store. Do not want to keep scanning the store adding those same 3 runnable things - // until the queue is full. - if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) { - break; - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } + // Read from the config here and here only. Must avoid reading the same property from the + // config more than once since it can change at any point in this execution + var poolConfigs = getPoolConfigurations(conf); + var idleCheckIntervalMillis = conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL); + + // shutdown task: shutdown fate executors whose set of fate operations are no longer present + // in the config + synchronized (fateExecutors) { + final var fateExecutorsIter = fateExecutors.iterator(); + while (fateExecutorsIter.hasNext()) { + var fateExecutor = fateExecutorsIter.next(); + + // if this fate executors set of fate ops is no longer present in the config... + if (!poolConfigs.containsKey(fateExecutor.getFateOps())) { + if (!fateExecutor.isShutdown()) { + log.debug("The config for {} has changed invalidating {}. Gracefully shutting down " + + "the FateExecutor.", getFateConfigProp(), fateExecutor); + fateExecutor.initiateShutdown(); + } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) { + log.debug("{} has been shutdown, but is still actively working on transactions.", + fateExecutor); + } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) { + log.debug("{} has been shutdown and all threads have safely terminated.", + fateExecutor); + fateExecutorsIter.remove(); } - }); - } catch (Exception e) { - if (keepRunning.get()) { - log.warn("Failure while attempting to find work for fate", e); - } else { - log.debug("Failure while attempting to find work for fate", e); } - - workQueue.clear(); } } - } - } - - private class TransactionRunner implements Runnable { - // used to signal a TransactionRunner to stop in the case where there are too many running - // i.e., the property for the pool size decreased and we have excess TransactionRunners - private final AtomicBoolean stop = new AtomicBoolean(false); - private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException { - while (keepRunning.get() && !stop.get()) { - FateId unreservedFateId = workQueue.poll(100, MILLISECONDS); - - if (unreservedFateId == null) { - continue; - } - var optionalopStore = store.tryReserve(unreservedFateId); - if (optionalopStore.isPresent()) { - return optionalopStore; + // replacement task: at this point, the existing FateExecutors that were invalidated by the + // config changes have started shutdown or finished shutdown. Now create any new replacement + // FateExecutors needed + for (var poolConfig : poolConfigs.entrySet()) { + var configFateOps = poolConfig.getKey(); + var configPoolSize = poolConfig.getValue(); + synchronized (fateExecutors) { + if (fateExecutors.stream().map(FateExecutor::getFateOps) + .noneMatch(fo -> fo.equals(configFateOps))) { + fateExecutors + .add(new FateExecutor<>(Fate.this, environment, configFateOps, configPoolSize)); + } } } - return Optional.empty(); - } - - @Override - public void run() { - runningTxRunners.add(this); - try { - while (keepRunning.get() && !stop.get()) { - FateTxStore<T> txStore = null; - ExecutionState state = new ExecutionState(); - try { - var optionalopStore = reserveFateTx(); - if (optionalopStore.isPresent()) { - txStore = optionalopStore.orElseThrow(); - } else { - continue; - } - state.status = txStore.getStatus(); - state.op = txStore.top(); - if (state.status == FAILED_IN_PROGRESS) { - processFailed(txStore, state.op); - } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) { + // resize task: For each fate executor, resize the pool to match the config as necessary and + // submit new TransactionRunners if the pool grew, stop TransactionRunners if the pool + // shrunk, and potentially suggest resizing the pool if the load is consistently high. + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + if (fateExecutor.isShutdown()) { + continue; + } + final var pool = fateExecutor.getTransactionExecutor(); + final var poolName = fateExecutor.getPoolName(); + final var runningTxRunners = fateExecutor.getRunningTxRunners(); + final int configured = poolConfigs.get(fateExecutor.getFateOps()); + ThreadPools.resizePool(pool, () -> configured, poolName); + final int needed = configured - runningTxRunners.size(); Review Comment: Could all of this code for resizing be pushed into FateExectuor? Seems like a lot of things are requested from fateExecutor for the code here, but not sure if this code exclusively working w/ refs from fateExecutor. Wondering if here we could call `fateExecutor.updateSizeConfig(configured)` and then in a method inside FateExectuor it takes care of starting/stopping threads if needed. ########## core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java: ########## @@ -219,4 +220,83 @@ public void testTypeFILENAME_EXT() { invalid(null, "RF", "map", "", "MAP", "rF", "Rf", " rf "); } + @Test + public void testTypeFATE_USER_CONFIG() { + var allUserFateOps = Fate.FateOperation.getAllUserFateOps(); + int poolSize1 = allUserFateOps.size() / 2; + var validPool1Ops = + allUserFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(",")); + var validPool2Ops = + allUserFateOps.stream().map(Enum::name).skip(poolSize1).collect(Collectors.joining(",")); + var invalidPool1Ops = + allUserFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(",")); + var invalidPool2Ops = allUserFateOps.stream().map(Enum::name).skip(poolSize1 + 1) + .collect(Collectors.joining(",")); + // should be valid: one pool for all ops, order should not matter, all ops split across + // multiple pools + valid( + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + "\": 10}", + "{\"" + validPool2Ops + "," + validPool1Ops + "\": 10}", + "{\"" + validPool1Ops + "\": 2, \"" + validPool2Ops + "\": 3}"); + // should be invalid: invalid json, null, missing FateOperation, pool size of 0, pool size of + // -1, invalid pool size, invalid key, same FateOperation repeated in a different pool, invalid + // FateOperation + invalid("", null, "{\"" + invalidPool1Ops + "\": 2, \"" + invalidPool2Ops + "\": 3}", + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": 0}", + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + "\": -1}", + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": x}", + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(", ")) + + "\": 10}", + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + "\": 10, \"" + + allUserFateOps.stream().map(Enum::name).limit(1).collect(Collectors.joining(",")) + + "\": 10}", + "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + ",INVALID_FATEOP\": 10}"); + } + + @Test + public void testTypeFATE_META_CONFIG() { + var allMetaFateOps = Fate.FateOperation.getAllMetaFateOps(); + int poolSize1 = allMetaFateOps.size() / 2; + var validPool1Ops = + allMetaFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(",")); + var validPool2Ops = + allMetaFateOps.stream().map(Enum::name).skip(poolSize1).collect(Collectors.joining(",")); + var invalidPool1Ops = + allMetaFateOps.stream().map(Enum::name).limit(poolSize1).collect(Collectors.joining(",")); + var invalidPool2Ops = allMetaFateOps.stream().map(Enum::name).skip(poolSize1 + 1) + .collect(Collectors.joining(",")); + // should be valid: one pool for all ops, order should not matter, all ops split across + // multiple pools + valid( + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + "\": 10}", + "{\"" + validPool2Ops + "," + validPool1Ops + "\": 10}", + "{\"" + validPool1Ops + "\": 2, \"" + validPool2Ops + "\": 3}"); + // should be invalid: invalid json, null, missing FateOperation, pool size of 0, pool size of + // -1, invalid pool size, invalid key, same FateOperation repeated in a different pool, invalid + // FateOperation + invalid("", null, "{\"" + invalidPool1Ops + "\": 2, \"" + invalidPool2Ops + "\": 3}", + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": 0}", + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + "\": -1}", + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": x}", + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(", ")) + + "\": 10}", + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + "\": 10, \"" + + allMetaFateOps.stream().map(Enum::name).limit(1).collect(Collectors.joining(",")) + + "\": 10}", + "{\"" + allMetaFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + + ",INVALID_FATEOP\": 10}"); + } Review Comment: I was trying to correlate the comment w/ the code and was finding that difficult so made the following changes locally. What is in the comment is really good coverage of the error cases. Realized the comment seems to be in the same order as the code after making these changes. ```suggestion // should be invalid: invalid json, null, missing FateOperation, invalid("", null, "{\"" + invalidPool1Ops + "\": 2, \"" + invalidPool2Ops + "\": 3}", // pool size of 0 "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": 0}", // pool size of -1 "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": -1}", // invalid pool size "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": x}", // invalid key because of space "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(", ")) + "\": 10}", // same FateOperation repeated in a different pool "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "\": 10, \"" + allUserFateOps.stream().map(Enum::name).limit(1).collect(Collectors.joining(",")) + "\": 10}", // invalid FateOperation "{\"" + allUserFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + ",INVALID_FATEOP\": 10}"); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
