dlmarion commented on code in PR #6168:
URL: https://github.com/apache/accumulo/pull/6168#discussion_r2889850939


##########
core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java:
##########
@@ -287,6 +296,11 @@ public Set<ServiceLockPath> 
getCompactor(ResourceGroupPredicate resourceGroupPre
     return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, 
withLock);
   }
 
+  public Set<ServiceLockPath> getManagerAssistants(ResourceGroupPredicate 
resourceGroupPredicate,

Review Comment:
   In the method above the name is 'AssistantManager' and here it is 
`ManagerAssistants`. I think we should be consistent in the naming.
   
   Is it the case that clients will only always connect to the primary manager, 
so that is why it's called `Manager`?
   



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1288,16 +1308,56 @@ public void mainWait() throws InterruptedException {
     Thread.sleep(500);
   }
 
-  protected Fate<FateEnv> initializeFateInstance(ServerContext context, 
FateStore<FateEnv> store) {
+  /**
+   * This method exist so test can hook creating a fate instance.
+   */
+  @VisibleForTesting
+  protected Fate<FateEnv> createFateInstance(FateEnv env, FateStore<FateEnv> 
store,
+      ServerContext context) {
+    return new Fate<>(env, store, true, TraceRepo::toLogString, 
getConfiguration(),
+        context.getScheduledExecutor());
+  }
 
-    final Fate<FateEnv> fateInstance = new Fate<>(this, store, true, 
TraceRepo::toLogString,
-        getConfiguration(), context.getScheduledExecutor());
+  private void setupFate(ServerContext context, MetricsInfo metricsInfo) {
+    try {
+      Predicate<ZooUtil.LockID> isLockHeld =
+          lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
+      var metaStore = new MetaFateStore<FateEnv>(context.getZooSession(),
+          primaryManagerLock.getLockID(), isLockHeld);
+      var metaInstance = createFateInstance(this, metaStore, context);
+      // configure this instance to process all data
+      
metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META)));
+      var userStore = new UserFateStore<FateEnv>(context, 
SystemTables.FATE.tableName(),
+          managerLock.getLockID(), isLockHeld);
+      var userFateClient = new FateClient<FateEnv>(userStore, 
TraceRepo::toLogString);
+
+      var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), 
this::getSteadyTime);
+      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
+          .scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES));
+      var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), 
this::getSteadyTime);
+      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
+          .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));
+
+      if (!fateClients.compareAndSet(null,
+          Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, 
userFateClient))) {
+        throw new IllegalStateException(
+            "Unexpected previous fateClient reference map already 
initialized");
+      }
+      if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, 
metaInstance))) {
+        throw new IllegalStateException(
+            "Unexpected previous fate reference map already initialized");
+      }
 
-    var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), 
this::getSteadyTime);
-    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
-        .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
+      managerMetrics.configureFateMetrics(getConfiguration(), this);

Review Comment:
   Might be useful to comment in the run method that this method must be called 
before registering the managerMetrics producer for maintenance purposes.



##########
test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public class ComprehensiveMultiManagerIT extends ComprehensiveITBase {
+
+  private static class ComprehensiveITConfiguration implements 
MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+    }
+  }
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    ComprehensiveITConfiguration c = new ComprehensiveITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.securityOperations().changeUserAuthorizations("root", 
AUTHORIZATIONS);
+    }
+
+    // Start two more managers
+    getCluster().exec(Manager.class);
+    getCluster().exec(Manager.class);

Review Comment:
   Should this wait until all are up and reporting in ZK before continuing?



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -958,6 +975,26 @@ public void run() {
       throw new IllegalStateException("Unable to start server on host " + 
getBindAddress(), e);
     }
 
+    tserverSet.startListeningForTabletServerChanges(this);

Review Comment:
   Curious why this needs to be moved. Do you need the tservers for the fate 
workers?



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1391,15 +1451,60 @@ private long remaining(long deadline) {
     return Math.max(1, deadline - System.currentTimeMillis());
   }
 
+  private void getManagerLock() throws KeeperException, InterruptedException {

Review Comment:
   ```suggestion
     private void getManagerAssistantLock() throws KeeperException, 
InterruptedException {
   ```



##########
core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java:
##########
@@ -287,6 +296,11 @@ public Set<ServiceLockPath> 
getCompactor(ResourceGroupPredicate resourceGroupPre
     return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, 
withLock);
   }
 
+  public Set<ServiceLockPath> getManagerAssistants(ResourceGroupPredicate 
resourceGroupPredicate,

Review Comment:
   You should be able to remove the resource group predicate here and use the 
DEFAULT_RG_ONLY predicate. Or, maybe this method can be removed in favor of the 
one below.



##########
core/src/main/java/org/apache/accumulo/core/fate/FateClient.java:
##########
@@ -46,8 +49,11 @@ public class FateClient<T> {
   private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
       EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
 
+  private AtomicReference<Consumer<FateId>> seedingConsumer = new 
AtomicReference<>(fid -> {});
+
   public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc) 
{
     this.store = FateLogger.wrap(store, toLogStrFunc, false);
+    ;

Review Comment:
   ```suggestion
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorkerEnv.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.fate;
+
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.FILE_RENAME_POOL;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.core.util.time.SteadyTime;
+import org.apache.accumulo.manager.EventCoordinator;
+import org.apache.accumulo.manager.EventPublisher;
+import org.apache.accumulo.manager.EventQueue;
+import org.apache.accumulo.manager.split.FileRangeCache;
+import org.apache.accumulo.manager.tableOps.FateEnv;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.manager.LiveTServerSet;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FateWorkerEnv implements FateEnv {
+
+  private static final Logger log = 
LoggerFactory.getLogger(FateWorkerEnv.class);
+
+  private final ServerContext ctx;
+  private final ExecutorService refreshPool;
+  private final ExecutorService renamePool;
+  private final ServiceLock serviceLock;
+  private final FileRangeCache fileRangeCache;
+  private final EventHandler eventHandler;
+  private final LiveTServerSet liveTServerSet;
+
+  private final EventQueue queue = new EventQueue();
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private final Thread eventSendThread;
+
+  public void stop() {
+    stopped.set(true);
+    try {
+      eventSendThread.join();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private class EventSender implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped.get()) {
+        try {
+          var events = queue.poll(100, TimeUnit.MILLISECONDS);
+          if (events.isEmpty()) {
+            continue;
+          }
+
+          var tEvents = 
events.stream().map(EventCoordinator.Event::toThrift).toList();
+
+          var client = ThriftClientTypes.MANAGER.getConnection(ctx);
+          try {
+            if (client != null) {
+              client.processEvents(TraceUtil.traceInfo(), ctx.rpcCreds(), 
tEvents);
+            }
+          } catch (TException e) {
+            log.warn("Failed to send events to manager", e);

Review Comment:
   ```suggestion
               log.warn("Failed to send events to primary manager", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to