keith-turner commented on code in PR #6168:
URL: https://github.com/apache/accumulo/pull/6168#discussion_r2891246770
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1288,16 +1308,56 @@ public void mainWait() throws InterruptedException {
Thread.sleep(500);
}
- protected Fate<FateEnv> initializeFateInstance(ServerContext context,
FateStore<FateEnv> store) {
+ /**
+ * This method exist so test can hook creating a fate instance.
+ */
+ @VisibleForTesting
+ protected Fate<FateEnv> createFateInstance(FateEnv env, FateStore<FateEnv>
store,
+ ServerContext context) {
+ return new Fate<>(env, store, true, TraceRepo::toLogString,
getConfiguration(),
+ context.getScheduledExecutor());
+ }
- final Fate<FateEnv> fateInstance = new Fate<>(this, store, true,
TraceRepo::toLogString,
- getConfiguration(), context.getScheduledExecutor());
+ private void setupFate(ServerContext context, MetricsInfo metricsInfo) {
+ try {
+ Predicate<ZooUtil.LockID> isLockHeld =
+ lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
+ var metaStore = new MetaFateStore<FateEnv>(context.getZooSession(),
+ primaryManagerLock.getLockID(), isLockHeld);
+ var metaInstance = createFateInstance(this, metaStore, context);
+ // configure this instance to process all data
+
metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META)));
+ var userStore = new UserFateStore<FateEnv>(context,
SystemTables.FATE.tableName(),
+ managerLock.getLockID(), isLockHeld);
+ var userFateClient = new FateClient<FateEnv>(userStore,
TraceRepo::toLogString);
+
+ var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8),
this::getSteadyTime);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
+ .scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES));
+ var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8),
this::getSteadyTime);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
+ .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));
+
+ if (!fateClients.compareAndSet(null,
+ Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER,
userFateClient))) {
+ throw new IllegalStateException(
+ "Unexpected previous fateClient reference map already
initialized");
+ }
+ if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META,
metaInstance))) {
+ throw new IllegalStateException(
+ "Unexpected previous fate reference map already initialized");
+ }
- var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8),
this::getSteadyTime);
- ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
- .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
+ managerMetrics.configureFateMetrics(getConfiguration(), this);
Review Comment:
Made a different change w/ the same goal in
d74a34c4a07f62f6aef2830ed9c693850be24d82
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]