keith-turner commented on code in PR #5301:
URL: https://github.com/apache/accumulo/pull/5301#discussion_r1996136801
##########
test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherIT.java:
##########
@@ -357,6 +357,151 @@ protected void
testIdleCountHistory(FateStore<PoolResizeTestEnv> store, ServerCo
}
}
+ @Test
+ public void testFatePoolsPartitioning() throws Exception {
+ executeTest(this::testFatePoolsPartitioning);
+ }
+
+ protected void testFatePoolsPartitioning(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
+ throws Exception {
+ // Ensures FATE ops are correctly partitioned between the pools.
Configures 4 FateExecutors:
+ // FateExecutor1 with 2 threads operating on 1/4 of FATE ops
+ // FateExecutor2 with 3 threads operating on 1/4 of FATE ops
+ // FateExecutor3 with 4 threads operating on 1/4 of FATE ops
+ // FateExecutor4 with 5 threads operating on 1/4 of FATE ops
+ // Seeds:
+ // 5 transactions on FateExecutor1
+ // 6 transactions on FateExecutor2
+ // 1 transactions on FateExecutor3
+ // 4 transactions on FateExecutor4
+ // Ensures that we only see min(configured threads, transactions seeded)
ever running
+ // Also ensures that FateExecutors do not pick up any work that they
shouldn't
+ final int numThreadsPool1 = 2;
+ final int numThreadsPool2 = 3;
+ final int numThreadsPool3 = 4;
+ final int numThreadsPool4 = 5;
+ final int numSeedPool1 = 5;
+ final int numSeedPool2 = 6;
+ final int numSeedPool3 = 1;
+ final int numSeedPool4 = 4;
+
+ final int numUserOpsPerPool = ALL_USER_FATE_OPS.size() / 4;
+ final int numMetaOpsPerPool = ALL_META_FATE_OPS.size() / 4;
+
+ final Set<Fate.FateOperation> userPool1 =
+
ALL_USER_FATE_OPS.stream().limit(numUserOpsPerPool).collect(Collectors.toSet());
+ final Set<Fate.FateOperation> userPool2 =
ALL_USER_FATE_OPS.stream().skip(numUserOpsPerPool)
+ .limit(numUserOpsPerPool).collect(Collectors.toSet());
+ final Set<Fate.FateOperation> userPool3 =
ALL_USER_FATE_OPS.stream().skip(numUserOpsPerPool * 2)
+ .limit(numUserOpsPerPool).collect(Collectors.toSet());
+ // no limit for pool 4 in case total num ops is odd
+ final Set<Fate.FateOperation> userPool4 =
+ ALL_USER_FATE_OPS.stream().skip(numUserOpsPerPool *
3).collect(Collectors.toSet());
+
+ final Set<Fate.FateOperation> metaPool1 =
+
ALL_META_FATE_OPS.stream().limit(numMetaOpsPerPool).collect(Collectors.toSet());
+ final Set<Fate.FateOperation> metaPool2 =
ALL_META_FATE_OPS.stream().skip(numMetaOpsPerPool)
+ .limit(numMetaOpsPerPool).collect(Collectors.toSet());
+ final Set<Fate.FateOperation> metaPool3 =
ALL_META_FATE_OPS.stream().skip(numMetaOpsPerPool * 2)
+ .limit(numMetaOpsPerPool).collect(Collectors.toSet());
+ // no limit for pool 4 in case total num ops is odd
+ final Set<Fate.FateOperation> metaPool4 =
+ ALL_META_FATE_OPS.stream().skip(numMetaOpsPerPool *
3).collect(Collectors.toSet());
+
+ final ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_USER_CONFIG,
+ String.format("{\"%s\": %s, \"%s\": %s, \"%s\": %s, \"%s\": %s}",
+
userPool1.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool1,
+
userPool2.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool2,
+
userPool3.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool3,
+
userPool4.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool4));
+ config.set(Property.MANAGER_FATE_META_CONFIG,
+ String.format("{\"%s\": %s, \"%s\": %s, \"%s\": %s, \"%s\": %s}",
+
metaPool1.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool1,
+
metaPool2.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool2,
+
metaPool3.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool3,
+
metaPool4.stream().map(Enum::name).collect(Collectors.joining(",")),
numThreadsPool4));
+ config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
+
+ final boolean isUserStore = store.type() == FateInstanceType.USER;
+
+ final Fate.FateOperation fateOpFromPool1 =
+ isUserStore ? userPool1.iterator().next() :
metaPool1.iterator().next();
+ final Fate.FateOperation fateOpFromPool2 =
+ isUserStore ? userPool2.iterator().next() :
metaPool2.iterator().next();
+ final Fate.FateOperation fateOpFromPool3 =
+ isUserStore ? userPool3.iterator().next() :
metaPool3.iterator().next();
+ final Fate.FateOperation fateOpFromPool4 =
+ isUserStore ? userPool4.iterator().next() :
metaPool4.iterator().next();
+
+ final Set<Fate.FateOperation> pool1 = isUserStore ? userPool1 : metaPool1;
+ final Set<Fate.FateOperation> pool2 = isUserStore ? userPool2 : metaPool2;
+ final Set<Fate.FateOperation> pool3 = isUserStore ? userPool3 : metaPool3;
+ final Set<Fate.FateOperation> pool4 = isUserStore ? userPool4 : metaPool4;
+
+ boolean allAssertsOccurred = false;
+ final var env = new PoolResizeTestEnv();
+ final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r
-> r + "", config);
+
+ try {
+ // seeding pool1/FateExecutor1
+ for (int i = 0; i < numSeedPool1; i++) {
+ fate.seedTransaction(fateOpFromPool1, fate.startTransaction(), new
PoolResizeTestRepo(),
+ true, "testing");
+ }
+ // seeding pool2/FateExecutor2
+ for (int i = 0; i < numSeedPool2; i++) {
+ fate.seedTransaction(fateOpFromPool2, fate.startTransaction(), new
PoolResizeTestRepo(),
+ true, "testing");
+ }
+ // seeding pool3/FateExecutor3
+ for (int i = 0; i < numSeedPool3; i++) {
+ fate.seedTransaction(fateOpFromPool3, fate.startTransaction(), new
PoolResizeTestRepo(),
+ true, "testing");
+ }
+ // seeding pool4/FateExecutor4
+ for (int i = 0; i < numSeedPool4; i++) {
+ fate.seedTransaction(fateOpFromPool4, fate.startTransaction(), new
PoolResizeTestRepo(),
+ true, "testing");
+ }
+
+ Wait.waitFor(() -> env.numWorkers.get()
+ == Math.min(numThreadsPool1, numSeedPool1) +
Math.min(numThreadsPool2, numSeedPool2)
+ + Math.min(numThreadsPool3, numSeedPool3) +
Math.min(numThreadsPool4, numSeedPool4));
+ // wait for all transaction runners to be active
+ Wait.waitFor(() -> fate.getTotalTxRunnersActive()
+ == numThreadsPool1 + numThreadsPool2 + numThreadsPool3 +
numThreadsPool4);
+ assertEquals(numThreadsPool1, fate.getTxRunnersActive(pool1));
+ assertEquals(numThreadsPool2, fate.getTxRunnersActive(pool2));
+ assertEquals(numThreadsPool3, fate.getTxRunnersActive(pool3));
+ assertEquals(numThreadsPool4, fate.getTxRunnersActive(pool4));
+
+ // wait a bit longer to ensure another iteration of the pool watcher
check doesn't change
+ // anything
+ Thread.sleep(fate.getPoolWatcherDelay().toMillis() + 1_000);
+
+ assertEquals(env.numWorkers.get(),
Review Comment:
This is a good check that verifies things from the actualy running thread
perspective, however it only verifies the sum. In addition to this check maybe
could do something like the following to verifiy what is in the store at a more
granular level, if it would work. The checks together would give a stronger
verification.
```
Map<Fate.FateOperation,Long> seenCounts =
store.list().filter(fateIdStatus -> fateIdStatus.getFateOperation().isPresent()
&&
fateIdStatus.getFateReservation().isPresent()).collect(Collectors.groupingBy(fis->fis.getFateOperation().orElseThrow(),
Collectors.counting()));
Map<Fate.FateOperation,Long> expectedCounts = Map.of(fateOpFromPool1,
(long)Math.min(numThreadsPool1, numSeedPool1),...);
assertEquals(expectedCounts, seenCounts);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]