kevinrr888 commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1662598974


##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,102 @@ public FateId create() {
   @Override
   protected void create(FateId fateId, FateKey key) {
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
+      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null, 
key).serialize(),
           NodeExistsPolicy.FAIL);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+            && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    boolean isReserved;
+    try {
+      isReserved = getNode(fateId).isReserved();
+    } catch (Exception e) {
+      // Exception thrown, so node doesn't exist, so it is not reserved
+      isReserved = false;
+    }

Review Comment:
   Looking at getNode(), the NNE is handled in that method, so I simplified 
isReserved()



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +323,14 @@ public void setStatus(TStatus status) {
       verifyReserved(true);
 
       try {
-        zk.putPersistentData(getTXPath(fateId), new 
NodeValue(status).serialize(),
-            NodeExistsPolicy.OVERWRITE);
-      } catch (KeeperException | InterruptedException e) {
+        zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+          NodeValue currNodeVal = new NodeValue(currSerializedData);

Review Comment:
   Fixed



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -315,6 +420,30 @@ public List<ReadOnlyRepo<T>> getStack() {
         return dops;
       }
     }
+
+    @Override
+    protected void unreserve() {
+      try {
+        if (!this.deleted) {
+          zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
+            NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+            FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+            if ((currNodeVal.isReserved()
+                && 
currNodeVal.reservation.orElseThrow().equals(this.reservation))) {
+              // Remove the FateReservation from the NodeValue to unreserve
+              return new NodeValue(currNodeVal.status, null, 
currFateKey).serialize();
+            } else {
+              // possible this is running a 2nd time in zk server fault 
conditions and its first

Review Comment:
   Added



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -421,14 +548,30 @@ private Optional<FateKey> 
deserializeFateKey(DataInputBuffer buffer) throws IOEx
       return Optional.empty();
     }
 
+    private Optional<FateReservation> 
deserializeFateReservation(DataInputBuffer buffer)
+        throws IOException {
+      int length = buffer.readInt();
+      if (length > 0) {
+        return 
Optional.of(FateReservation.deserialize(buffer.readNBytes(length)));
+      }
+      return Optional.empty();
+    }
+
     byte[] serialize() {
       try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
           DataOutputStream dos = new DataOutputStream(baos)) {
         dos.writeUTF(status.name());
+        if (isReserved()) {

Review Comment:
   That sounds like a good idea



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -502,4 +520,14 @@ protected Serializable deserializeTxInfo(TxInfo txInfo, 
byte[] data) {
       throw new IllegalStateException("Bad node data " + txInfo);
     }
   }
+
+  /**
+   * TODO 4131 this is a temporary method used to create a dummy lock when 
using a FateStore outside

Review Comment:
   Yes, that sounds like a great change. And yes you are correct about some of 
the fate admin utils requiring the Manager to be down, so that would be helpful 
for those as well.



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +154,76 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      status = newMutator(fateId).requireReserved(reservation).tryMutate();
+      if (status.equals(FateMutator.Status.ACCEPTED)) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    return 
newMutator(fateId).requireReserved().tryMutate().equals(FateMutator.Status.ACCEPTED);

Review Comment:
   Understood, fixed



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -272,8 +340,12 @@ public FateInstanceType type() {
 
   private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
 
-    private FateTxStoreImpl(FateId fateId, boolean isReserved) {
-      super(fateId, isReserved);
+    private FateTxStoreImpl(FateId fateId) {
+      super(fateId);
+    }
+
+    private FateTxStoreImpl(FateId fateId, FateReservation reservation) {

Review Comment:
   That sounds like a good addition



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +154,76 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      status = newMutator(fateId).requireReserved(reservation).tryMutate();
+      if (status.equals(FateMutator.Status.ACCEPTED)) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      }

Review Comment:
   Yeah, I wasn't sure about this mutation. This is a much better way to do 
this; fixed.



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -313,6 +317,26 @@ private void undo(FateId fateId, Repo<T> op) {
 
   }
 
+  /**
+   * A thread that finds reservations held by dead processes and unreserves 
them. Only one thread
+   * runs per store type across all Fate instances (one to clean up dead 
reservations for
+   * {@link org.apache.accumulo.core.fate.user.UserFateStore UserFateStore} 
and one to clean up dead
+   * reservations for {@link MetaFateStore}).
+   */
+  private class DeadReservationCleaner implements Runnable {
+    // TODO 4131 periodic check runs every 30 seconds
+    // Should this be longer? Shorter? A configurable Property? A function of 
something?
+    private static final long INTERVAL_MILLIS = 30_000;
+
+    @Override
+    public void run() {
+      while (keepRunning.get()) {
+        store.deleteDeadReservations();

Review Comment:
   After the DeadReservationCleaner dies, the Manager continues to run 
normally. I changed it to be a critical thread. When looking at this, I noticed 
that none of the other Fate threads are critical. It seems that maybe they 
should be? I can create a follow on issue if so. If the WorkFinder or any of 
the actual workers (TransactionRunner) die, Manager will continue to run 
normally.



##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static 
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+// TODO 4131 could potentially have separate classes for testing MetaFateStore 
and UserFateStore
+// similar to how FateTestRunner is used, however that interface doesn't work 
as nicely here
+// since we are using multiple stores instead of just one. Can do something 
similar to
+// FateTestRunner here if desired
+public class MultipleStoresIT extends SharedMiniClusterBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultipleStoresIT.class);
+  @TempDir
+  private static File tempDir;
+  private static ZooKeeperTestingServer szk = null;
+  private static ZooReaderWriter zk;
+  private static final String FATE_DIR = "/fate";
+  private ClientContext client;
+
+  @BeforeEach
+  public void beforeEachSetup() {
+    client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build();
+  }
+
+  @AfterEach
+  public void afterEachTeardown() {
+    client.close();
+  }
+
+  @BeforeAll
+  public static void beforeAllSetup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+    szk = new ZooKeeperTestingServer(tempDir);
+    zk = szk.getZooReaderWriter();
+  }
+
+  @AfterAll
+  public static void afterAllTeardown() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    szk.close();
+  }
+
+  @Test
+  public void testReserveUnreserve() throws Exception {
+    testReserveUnreserve(FateInstanceType.META);
+    testReserveUnreserve(FateInstanceType.USER);
+  }
+
+  private void testReserveUnreserve(FateInstanceType storeType) throws 
Exception {
+    // reserving/unreserving a FateId should be reflected across instances of 
the stores
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> activeReservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    // Create the fate ids using store1
+    for (int i = 0; i < numFateIds; i++) {
+      assertTrue(allIds.add(store1.create()));
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Reserve half the fate ids using store1 and rest using store2, after 
reserving a fate id in
+    // one, should not be able to reserve the same in the other. Should also 
not matter that all the
+    // ids were created using store1
+    int count = 0;
+    for (FateId fateId : allIds) {
+      if (count % 2 == 0) {
+        reservations.add(store1.reserve(fateId));
+        assertTrue(store2.tryReserve(fateId).isEmpty());
+      } else {
+        reservations.add(store2.reserve(fateId));
+        assertTrue(store1.tryReserve(fateId).isEmpty());
+      }
+      count++;
+    }
+    // Both stores should return the same reserved transactions
+    activeReservations = store1.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+    activeReservations = store2.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+
+    // Test setting/getting the TStatus and unreserving the transactions
+    for (int i = 0; i < allIds.size(); i++) {
+      var reservation = reservations.get(i);
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
+      reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
+      assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, 
reservation.getStatus());
+      reservation.delete();
+      reservation.unreserve(Duration.ofMillis(0));
+      // Attempt to set a status on a tx that has been unreserved (should 
throw exception)
+      assertThrows(IllegalStateException.class,
+          () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
+    }
+    assertTrue(store1.getActiveReservations().isEmpty());
+    assertTrue(store2.getActiveReservations().isEmpty());
+  }
+
+  @Test
+  public void testReserveNonExistentTxn() throws Exception {
+    testReserveNonExistentTxn(FateInstanceType.META);
+    testReserveNonExistentTxn(FateInstanceType.USER);
+  }
+
+  private void testReserveNonExistentTxn(FateInstanceType storeType) throws 
Exception {
+    // Tests that reserve() doesn't hang indefinitely and instead throws an 
error
+    // on reserve() a non-existent transaction.
+    final FateStore<SleepingTestEnv> store;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final String tableName = getUniqueNames(1)[0];
+    final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId));
+  }
+
+  @Test
+  public void testReserveReservedAndUnreserveUnreserved() throws Exception {
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.META);
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER);
+  }
+
+  private void testReserveReservedAndUnreserveUnreserved(FateInstanceType 
storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Try to reserve again, should not reserve
+    for (FateId fateId : allIds) {
+      assertTrue(store.tryReserve(fateId).isEmpty());
+    }
+
+    // Unreserve all the FateIds
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+    // Try to unreserve again (should throw exception)
+    for (var reservation : reservations) {
+      assertThrows(IllegalStateException.class, () -> 
reservation.unreserve(Duration.ofMillis(0)));
+    }
+  }
+
+  @Test
+  public void testReserveAfterUnreserveAndReserveAfterDeleted() throws 
Exception {
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.META);
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER);
+  }
+
+  private void 
testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Unreserve all
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+
+    // Ensure they can be reserved again, and delete and unreserve this time
+    for (FateId fateId : allIds) {
+      // Verify that the tx status is still NEW after unreserving since it 
hasn't been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, 
store.read(fateId).getStatus());
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservation.orElseThrow().delete();
+      reservation.orElseThrow().unreserve(Duration.ofMillis(0));
+    }
+
+    for (FateId fateId : allIds) {
+      // Verify that the tx is now unknown since it has been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, 
store.read(fateId).getStatus());
+      // Attempt to reserve a deleted txn, should throw an exception and not 
wait indefinitely
+      assertThrows(IllegalStateException.class, () -> store.reserve(fateId));
+    }
+  }
+
+  @Test
+  public void testMultipleFateInstances() throws Exception {
+    testMultipleFateInstances(FateInstanceType.META);
+    testMultipleFateInstances(FateInstanceType.USER);
+  }
+
+  private void testMultipleFateInstances(FateInstanceType storeType) throws 
Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
+    final SleepingTestEnv testEnv2 = new SleepingTestEnv(50);
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    Fate<SleepingTestEnv> fate1 =
+        new Fate<>(testEnv1, store1, Object::toString, 
DefaultConfiguration.getInstance());
+    fate1.startDeadReservationCleaner();
+    Fate<SleepingTestEnv> fate2 =
+        new Fate<>(testEnv2, store2, Object::toString, 
DefaultConfiguration.getInstance());
+    fate2.startDeadReservationCleaner();
+
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId;
+      // Start half the txns using fate1, and the other half using fate2
+      if (i % 2 == 0) {
+        fateId = fate1.startTransaction();
+        fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      } else {
+        fateId = fate2.startTransaction();
+        fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      }
+      allIds.add(fateId);
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Should be able to wait for completion on any fate instance
+    for (FateId fateId : allIds) {
+      fate2.waitForCompletion(fateId);
+    }
+    // Ensure that all txns have been executed and have only been executed once
+    assertTrue(Collections.disjoint(testEnv1.executedOps, 
testEnv2.executedOps));
+    assertEquals(allIds, Sets.union(testEnv1.executedOps, 
testEnv2.executedOps));
+
+    fate1.shutdown(1, TimeUnit.MINUTES);
+    fate2.shutdown(1, TimeUnit.MINUTES);
+  }
+
+  @Test
+  public void testDeadReservationsCleanup() throws Exception {
+    testDeadReservationsCleanup(FateInstanceType.META);
+    testDeadReservationsCleanup(FateInstanceType.USER);
+  }
+
+  private void testDeadReservationsCleanup(FateInstanceType storeType) throws 
Exception {
+    // Tests reserving some transactions, then simulating that the Manager 
died by creating
+    // a new Fate instance and store with a new LockID. The transactions which 
were
+    // reserved using the old LockID should be cleaned up by Fate's 
DeadReservationCleaner,
+    // then picked up by the new Fate/store.
+
+    final String tableName = getUniqueNames(1)[0];
+    // One transaction for each FATE worker thread
+    final int numFateIds =
+        
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<LatchTestEnv> mockedStore1, store2;
+    final LatchTestEnv testEnv1 = new LatchTestEnv();
+    final LatchTestEnv testEnv2 = new LatchTestEnv();
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> reservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      mockedStore1 = EasyMock.createMockBuilder(UserFateStore.class)
+          .withConstructor(ClientContext.class, String.class, 
ZooUtil.LockID.class)
+          .withArgs(client, tableName, 
lock1).addMockedMethod("isDeadReservation").createMock();

Review Comment:
   For now, I went with your next suggestion with passing the predicate to the 
stores. This is not as end-to-end because the lock is not actually in ZK but 
may be good enough for now. Can be done in follow on issue if desired.



##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static 
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+// TODO 4131 could potentially have separate classes for testing MetaFateStore 
and UserFateStore
+// similar to how FateTestRunner is used, however that interface doesn't work 
as nicely here
+// since we are using multiple stores instead of just one. Can do something 
similar to
+// FateTestRunner here if desired
+public class MultipleStoresIT extends SharedMiniClusterBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultipleStoresIT.class);
+  @TempDir
+  private static File tempDir;
+  private static ZooKeeperTestingServer szk = null;
+  private static ZooReaderWriter zk;
+  private static final String FATE_DIR = "/fate";
+  private ClientContext client;
+
+  @BeforeEach
+  public void beforeEachSetup() {
+    client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build();
+  }
+
+  @AfterEach
+  public void afterEachTeardown() {
+    client.close();
+  }
+
+  @BeforeAll
+  public static void beforeAllSetup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+    szk = new ZooKeeperTestingServer(tempDir);
+    zk = szk.getZooReaderWriter();
+  }
+
+  @AfterAll
+  public static void afterAllTeardown() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    szk.close();
+  }
+
+  @Test
+  public void testReserveUnreserve() throws Exception {
+    testReserveUnreserve(FateInstanceType.META);
+    testReserveUnreserve(FateInstanceType.USER);
+  }
+
+  private void testReserveUnreserve(FateInstanceType storeType) throws 
Exception {
+    // reserving/unreserving a FateId should be reflected across instances of 
the stores
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> activeReservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    // Create the fate ids using store1
+    for (int i = 0; i < numFateIds; i++) {
+      assertTrue(allIds.add(store1.create()));
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Reserve half the fate ids using store1 and rest using store2, after 
reserving a fate id in
+    // one, should not be able to reserve the same in the other. Should also 
not matter that all the
+    // ids were created using store1
+    int count = 0;
+    for (FateId fateId : allIds) {
+      if (count % 2 == 0) {
+        reservations.add(store1.reserve(fateId));
+        assertTrue(store2.tryReserve(fateId).isEmpty());
+      } else {
+        reservations.add(store2.reserve(fateId));
+        assertTrue(store1.tryReserve(fateId).isEmpty());
+      }
+      count++;
+    }
+    // Both stores should return the same reserved transactions
+    activeReservations = store1.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+    activeReservations = store2.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+
+    // Test setting/getting the TStatus and unreserving the transactions
+    for (int i = 0; i < allIds.size(); i++) {
+      var reservation = reservations.get(i);
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
+      reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
+      assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, 
reservation.getStatus());
+      reservation.delete();
+      reservation.unreserve(Duration.ofMillis(0));
+      // Attempt to set a status on a tx that has been unreserved (should 
throw exception)
+      assertThrows(IllegalStateException.class,
+          () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
+    }
+    assertTrue(store1.getActiveReservations().isEmpty());
+    assertTrue(store2.getActiveReservations().isEmpty());
+  }
+
+  @Test
+  public void testReserveNonExistentTxn() throws Exception {
+    testReserveNonExistentTxn(FateInstanceType.META);
+    testReserveNonExistentTxn(FateInstanceType.USER);
+  }
+
+  private void testReserveNonExistentTxn(FateInstanceType storeType) throws 
Exception {
+    // Tests that reserve() doesn't hang indefinitely and instead throws an 
error
+    // on reserve() a non-existent transaction.
+    final FateStore<SleepingTestEnv> store;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final String tableName = getUniqueNames(1)[0];
+    final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId));
+  }
+
+  @Test
+  public void testReserveReservedAndUnreserveUnreserved() throws Exception {
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.META);
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER);
+  }
+
+  private void testReserveReservedAndUnreserveUnreserved(FateInstanceType 
storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Try to reserve again, should not reserve
+    for (FateId fateId : allIds) {
+      assertTrue(store.tryReserve(fateId).isEmpty());
+    }
+
+    // Unreserve all the FateIds
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+    // Try to unreserve again (should throw exception)
+    for (var reservation : reservations) {
+      assertThrows(IllegalStateException.class, () -> 
reservation.unreserve(Duration.ofMillis(0)));
+    }
+  }
+
+  @Test
+  public void testReserveAfterUnreserveAndReserveAfterDeleted() throws 
Exception {
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.META);
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER);
+  }
+
+  private void 
testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Unreserve all
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+
+    // Ensure they can be reserved again, and delete and unreserve this time
+    for (FateId fateId : allIds) {
+      // Verify that the tx status is still NEW after unreserving since it 
hasn't been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, 
store.read(fateId).getStatus());
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservation.orElseThrow().delete();
+      reservation.orElseThrow().unreserve(Duration.ofMillis(0));
+    }
+
+    for (FateId fateId : allIds) {
+      // Verify that the tx is now unknown since it has been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, 
store.read(fateId).getStatus());
+      // Attempt to reserve a deleted txn, should throw an exception and not 
wait indefinitely
+      assertThrows(IllegalStateException.class, () -> store.reserve(fateId));
+    }
+  }
+
+  @Test
+  public void testMultipleFateInstances() throws Exception {
+    testMultipleFateInstances(FateInstanceType.META);
+    testMultipleFateInstances(FateInstanceType.USER);
+  }
+
+  private void testMultipleFateInstances(FateInstanceType storeType) throws 
Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
+    final SleepingTestEnv testEnv2 = new SleepingTestEnv(50);
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    Fate<SleepingTestEnv> fate1 =
+        new Fate<>(testEnv1, store1, Object::toString, 
DefaultConfiguration.getInstance());
+    fate1.startDeadReservationCleaner();
+    Fate<SleepingTestEnv> fate2 =
+        new Fate<>(testEnv2, store2, Object::toString, 
DefaultConfiguration.getInstance());
+    fate2.startDeadReservationCleaner();
+
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId;
+      // Start half the txns using fate1, and the other half using fate2
+      if (i % 2 == 0) {
+        fateId = fate1.startTransaction();
+        fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      } else {
+        fateId = fate2.startTransaction();
+        fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      }
+      allIds.add(fateId);
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Should be able to wait for completion on any fate instance
+    for (FateId fateId : allIds) {
+      fate2.waitForCompletion(fateId);
+    }
+    // Ensure that all txns have been executed and have only been executed once
+    assertTrue(Collections.disjoint(testEnv1.executedOps, 
testEnv2.executedOps));
+    assertEquals(allIds, Sets.union(testEnv1.executedOps, 
testEnv2.executedOps));
+
+    fate1.shutdown(1, TimeUnit.MINUTES);
+    fate2.shutdown(1, TimeUnit.MINUTES);
+  }
+
+  @Test
+  public void testDeadReservationsCleanup() throws Exception {
+    testDeadReservationsCleanup(FateInstanceType.META);
+    testDeadReservationsCleanup(FateInstanceType.USER);
+  }
+
+  private void testDeadReservationsCleanup(FateInstanceType storeType) throws 
Exception {
+    // Tests reserving some transactions, then simulating that the Manager 
died by creating
+    // a new Fate instance and store with a new LockID. The transactions which 
were
+    // reserved using the old LockID should be cleaned up by Fate's 
DeadReservationCleaner,
+    // then picked up by the new Fate/store.
+
+    final String tableName = getUniqueNames(1)[0];
+    // One transaction for each FATE worker thread
+    final int numFateIds =
+        
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<LatchTestEnv> mockedStore1, store2;
+    final LatchTestEnv testEnv1 = new LatchTestEnv();
+    final LatchTestEnv testEnv2 = new LatchTestEnv();
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> reservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      mockedStore1 = EasyMock.createMockBuilder(UserFateStore.class)
+          .withConstructor(ClientContext.class, String.class, 
ZooUtil.LockID.class)
+          .withArgs(client, tableName, 
lock1).addMockedMethod("isDeadReservation").createMock();
+    } else {
+      mockedStore1 = EasyMock.createMockBuilder(MetaFateStore.class)
+          .withConstructor(String.class, ZooReaderWriter.class, ZooCache.class,
+              ZooUtil.LockID.class)
+          .withArgs(FATE_DIR, zk, client.getZooCache(), 
lock1).addMockedMethod("isDeadReservation")
+          .createMock();

Review Comment:
   The predicate is a great idea and cleans up the code quite a bit; added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to