keith-turner commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1696006961


##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -104,19 +109,108 @@ public FateId create() {
   }
 
   @Override
-  protected void create(FateId fateId, FateKey key) {
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
-          NodeExistsPolicy.FAIL);
-    } catch (KeeperException | InterruptedException e) {
+      byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId),
+          new NodeValue(TStatus.NEW, reservation, fateKey).serialize(), 
currSerNodeVal -> {
+            // We are only returning a non-null value for the following cases:
+            // 1) The existing NodeValue for fateId is exactly the same as the 
value set for the
+            // node if it doesn't yet exist:
+            // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = 
fateKey
+            // This might occur if there was a ZK server fault and the same 
write is running a 2nd
+            // time
+            // 2) The existing NodeValue for fateId has:
+            // TStatus = TStatus.NEW, no FateReservation present, FateKey = 
fateKey
+            // The fateId is NEW/unseeded and not reserved, so we can allow it 
to be reserved
+            NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+            if (currNodeVal.status == TStatus.NEW && 
currNodeVal.isReservedBy(reservation)) {
+              verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+              return currSerNodeVal;
+            } else if (currNodeVal.status == TStatus.NEW && 
!currNodeVal.isReserved()) {
+              verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+              // NEW/unseeded transaction and not reserved, so we can allow it 
to be reserved
+              return new NodeValue(TStatus.NEW, reservation, 
fateKey).serialize();
+            } else {
+              log.trace(
+                  "fate id {} tstatus {} fate key {} is reserved {} is either 
currently reserved "
+                      + "or has already been seeded with work (non-NEW 
status), or both",
+                  fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null),
+                  currNodeVal.isReserved());
+              // This will not change the value to null but will return null
+              return null;
+            }
+          });
+      if (nodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {

Review Comment:
   Its possible nothing exists for this fateId in zookeeper.  Looking at the 
impl of `mutateExisting` it calls 
[Zookeeper.getData()](https://zookeeper.apache.org/doc/r3.9.2/apidocs/zookeeper-server/org/apache/zookeeper/ZooKeeper.html#getData(java.lang.String,boolean,org.apache.zookeeper.data.Stat))
 which will throw a NoNodeException for this case.   Looking at the usage of  
`mutateExisting` seems like its currently only used in places where we expect 
the node to always exists.  This case is a bit different, because the fate id 
could be deleted.  So could add the following to the try/catch.
   
   ```java
       }catch(KeeperException.NoNodeException nne) {
         //TODO log trace?
         return Optional.empty();
   ```
   
   Also maybe can test this case of attempting to reserve something that does 
not exists in the ITs.
   



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -104,19 +109,108 @@ public FateId create() {
   }
 
   @Override
-  protected void create(FateId fateId, FateKey key) {
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
-          NodeExistsPolicy.FAIL);
-    } catch (KeeperException | InterruptedException e) {
+      byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId),
+          new NodeValue(TStatus.NEW, reservation, fateKey).serialize(), 
currSerNodeVal -> {
+            // We are only returning a non-null value for the following cases:
+            // 1) The existing NodeValue for fateId is exactly the same as the 
value set for the
+            // node if it doesn't yet exist:
+            // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = 
fateKey
+            // This might occur if there was a ZK server fault and the same 
write is running a 2nd
+            // time
+            // 2) The existing NodeValue for fateId has:
+            // TStatus = TStatus.NEW, no FateReservation present, FateKey = 
fateKey
+            // The fateId is NEW/unseeded and not reserved, so we can allow it 
to be reserved
+            NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+            if (currNodeVal.status == TStatus.NEW && 
currNodeVal.isReservedBy(reservation)) {
+              verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+              return currSerNodeVal;
+            } else if (currNodeVal.status == TStatus.NEW && 
!currNodeVal.isReserved()) {
+              verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+              // NEW/unseeded transaction and not reserved, so we can allow it 
to be reserved
+              return new NodeValue(TStatus.NEW, reservation, 
fateKey).serialize();
+            } else {
+              log.trace(
+                  "fate id {} tstatus {} fate key {} is reserved {} is either 
currently reserved "
+                      + "or has already been seeded with work (non-NEW 
status), or both",
+                  fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null),
+                  currNodeVal.isReserved());
+              // This will not change the value to null but will return null
+              return null;
+            }
+          });
+      if (nodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || 
currNodeVal.isReservedBy(reservation)) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
       throw new IllegalStateException(e);
     }
   }
 
   @Override
-  protected Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId) {
-    final NodeValue node = getNode(fateId);
-    return new Pair<>(node.status, node.fateKey);
+  public void deleteDeadReservations() {
+    for (Map.Entry<FateId,FateReservation> entry : 
getActiveReservations().entrySet()) {
+      FateId fateId = entry.getKey();
+      FateReservation reservation = entry.getValue();
+      if (isLockHeld.test(reservation.getLockID())) {
+        continue;
+      }
+      try {
+        zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
+          NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+          // Make sure the current node is still reserved and reserved with 
the expected reservation
+          // and it is dead
+          if (currNodeVal.isReservedBy(reservation)
+              && 
!isLockHeld.test(currNodeVal.reservation.orElseThrow().getLockID())) {
+            // Delete the reservation
+            log.trace("Deleted the dead reservation {} for fate id {}", 
reservation, fateId);
+            return new NodeValue(currNodeVal.status, null, 
currNodeVal.fateKey.orElse(null))
+                .serialize();
+          } else {
+            // No change
+            return null;
+          }
+        });
+      } catch (KeeperException | InterruptedException | 
AcceptableThriftTableOperationException e) {

Review Comment:
   We could catch NoNodeException here and just ignore it, means it was deleted 
between when we read it and tried to act on it.



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -363,13 +498,23 @@ protected Stream<FateIdStatus> 
getTransactions(Set<TStatus> statuses) {
       Stream<FateIdStatus> stream = zk.getChildren(path).stream().map(strTxid 
-> {
         String txUUIDStr = strTxid.split("_")[1];
         FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
-        // Memoizing for two reasons. First the status may never be requested, 
so in that case avoid
-        // the lookup. Second, if its requested multiple times the result will 
always be consistent.
-        Supplier<TStatus> statusSupplier = Suppliers.memoize(() -> 
_getStatus(fateId));
+        // Memoizing for two reasons. First the status or reservation may 
never be requested, so
+        // in that case avoid the lookup. Second, if it's requested multiple 
times the result will
+        // always be consistent.
+        Supplier<Pair<TStatus,Optional<FateReservation>>> statusAndResSupplier 
=
+            Suppliers.memoize(() -> {
+              NodeValue zkNode = getNode(fateId);
+              return new Pair<>(zkNode.status, zkNode.reservation);
+            });

Review Comment:
   Seems like the Pair could be dropped and this could be simplified to return 
the NodeValue
   
   ```suggestion
           Supplier<NodeValue> statusAndResSupplier =
               Suppliers.memoize(() ->  getNode(fateId));
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
   }
 
   @Override
-  protected void create(FateId fateId, FateKey fateKey) {
-    final int maxAttempts = 5;
-
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+    Optional<FateTxStore<T>> txStore = Optional.empty();
+    int maxAttempts = 5;
+    FateMutator.Status status = null;
+
+    // We first need to write the initial/unreserved value for the reservation 
column
+    // Only need to retry if it is UNKNOWN
     for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
-      if (attempt >= 1) {
-        log.debug("Failed to create transaction with fateId {} and fateKey {}, 
trying again",
-            fateId, fateKey);
-        UtilWaitThread.sleep(100);
+      status = newMutator(fateId).putInitReservationVal().tryMutate();
+      if (status != FateMutator.Status.UNKNOWN) {
+        break;
       }
+      UtilWaitThread.sleep(100);
+    }
+    for (int attempt = 0; attempt < maxAttempts; attempt++) {

Review Comment:
   The way this code is structured its writing two mutations. Wondering if it 
could write everything in a single conditional  mutation. The conditional 
mutation would have the following
   
    * condition that reservation column is absent
    * condition that status column is absent
    * set status column to NEW
    * set reservation column to expected reservation
    * set key column
    * set time column
   
   I suspect the way it structured has to do with the NOT_RESERVED, but I do 
not understand why it needs to be this way.
   



##########
core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java:
##########
@@ -54,8 +55,15 @@ public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> op
 
   @Override
   public boolean accept(Key k, Value v) {
-    var tstatus = ReadOnlyFateStore.TStatus.valueOf(v.toString());
-    return valuesToAccept.contains(tstatus);
+    // We may see TStatus values or FateReservation values with how this 
filter is used,
+    // only accept TStatus values, return false on FateReservation values, 
error otherwise
+    try {
+      var tstatus = ReadOnlyFateStore.TStatus.valueOf(v.toString());
+      return valuesToAccept.contains(tstatus);
+    } catch (IllegalArgumentException e) {

Review Comment:
   This iterator may need to be reworked.  The iterator should probably return 
a the status and reservation column for row only if the status matches.  This 
iterator is looking at individual key values, so it can not consider the higher 
level of the row.  Could do the following
   
    * Make this class instead extent WholeRowIterator
    * Override the `boolean filter(Text currentRow, List<Key> keys, List<Value> 
values)` method in WholeRowIterator and make it look for the status column and 
check its value to determine wether to keep the row.
    * Change the `configureScanner` method on this class to either setup this 
filter or just a WholeRowIterator w/o filtering when all statuses are 
requested. 



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +322,20 @@ public void setStatus(TStatus status) {
       verifyReserved(true);
 
       try {
-        zk.putPersistentData(getTXPath(fateId), new 
NodeValue(status).serialize(),
-            NodeExistsPolicy.OVERWRITE);
-      } catch (KeeperException | InterruptedException e) {
+        zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+          NodeValue currNodeVal = new NodeValue(currSerializedData);
+          // Ensure the FateId is reserved in ZK, and it is reserved with the 
expected reservation
+          if (currNodeVal.isReservedBy(this.reservation)) {
+            FateReservation currFateReservation = 
currNodeVal.reservation.orElseThrow();
+            FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+            NodeValue newNodeValue = new NodeValue(status, 
currFateReservation, currFateKey);
+            return newNodeValue.serialize();
+          } else {
+            throw new IllegalStateException("Either the FateId is not reserved 
in ZK, or it is"
+                + " but the reservation in ZK differs from that in the 
store.");
+          }
+        });
+      } catch (KeeperException | InterruptedException | 
AcceptableThriftTableOperationException e) {

Review Comment:
   For this one we expect the node to exist in zookeeper, so if we see a 
NoNodeException here its ok throw an exception.



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
   }
 
   @Override
-  protected void create(FateId fateId, FateKey fateKey) {
-    final int maxAttempts = 5;
-
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+    Optional<FateTxStore<T>> txStore = Optional.empty();
+    int maxAttempts = 5;
+    FateMutator.Status status = null;
+
+    // We first need to write the initial/unreserved value for the reservation 
column
+    // Only need to retry if it is UNKNOWN
     for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
-      if (attempt >= 1) {
-        log.debug("Failed to create transaction with fateId {} and fateKey {}, 
trying again",
-            fateId, fateKey);
-        UtilWaitThread.sleep(100);
+      status = newMutator(fateId).putInitReservationVal().tryMutate();
+      if (status != FateMutator.Status.UNKNOWN) {
+        break;
       }
+      UtilWaitThread.sleep(100);
+    }
+    for (int attempt = 0; attempt < maxAttempts; attempt++) {
+      status = 
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
+          
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
+      if (status != FateMutator.Status.UNKNOWN) {
+        break;
+      }
+      UtilWaitThread.sleep(100);
+    }
 
-      var status = 
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
-          .putCreateTime(System.currentTimeMillis()).tryMutate();
+    switch (status) {
+      case ACCEPTED:
+        txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+        break;
+      case REJECTED:
+        // If the status is REJECTED, we need to check what about the mutation 
was REJECTED:
+        // 1) Possible something like the following occurred:
+        // the first attempt was UNKNOWN but written, the next attempt would 
be rejected
+        // We return the FateTxStore in this case.
+        // 2) If there is a collision with existing fate id, throw error
+        // 3) If the fate id is already reserved, return an empty optional
+        // 4) If the fate id is still NEW/unseeded and unreserved, we can try 
to reserve it
+        try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+          scanner.setRange(getRow(fateId));
+          scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+              TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+          scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+              TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+          
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+              TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+          TStatus statusSeen = TStatus.UNKNOWN;
+          Optional<FateKey> fateKeySeen = Optional.empty();
+          Optional<FateReservation> reservationSeen = Optional.empty();
+
+          for (Entry<Key,Value> entry : 
scanner.stream().collect(Collectors.toList())) {

Review Comment:
   ```suggestion
             for (Entry<Key,Value> entry : scanner) {
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -67,27 +76,36 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, 
FateKey fateKey) {
     }
   };
 
-  protected final Set<FateId> reserved;
+  // The ZooKeeper lock for the process that's running this store instance
+  protected final ZooUtil.LockID lockID;
+  protected final Predicate<ZooUtil.LockID> isLockHeld;
   protected final Map<FateId,NanoTime> deferred;
+  protected final FateIdGenerator fateIdGenerator;
   private final int maxDeferred;
   private final AtomicBoolean deferredOverflow = new AtomicBoolean();
-  private final FateIdGenerator fateIdGenerator;
-
-  // This is incremented each time a transaction was unreserved that was non 
new
-  protected final SignalCount unreservedNonNewCount = new SignalCount();
 
   // This is incremented each time a transaction is unreserved that was 
runnable
-  protected final SignalCount unreservedRunnableCount = new SignalCount();
+  private final SignalCount unreservedRunnableCount = new SignalCount();
+
+  // Keeps track of the number of concurrent callers to waitForStatusChange()
+  private final AtomicInteger concurrentStatusChangeCallers = new 
AtomicInteger(0);
 
   public AbstractFateStore() {
-    this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
+    this(null, null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
   }
 
-  public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
+  public AbstractFateStore(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> 
isLockHeld) {
+    this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
+  }
+
+  public AbstractFateStore(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> 
isLockHeld,
+      int maxDeferred, FateIdGenerator fateIdGenerator) {
     this.maxDeferred = maxDeferred;
     this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
-    this.reserved = new HashSet<>();
-    this.deferred = new HashMap<>();
+    this.deferred = Collections.synchronizedMap(new HashMap<>());
+    this.lockID = Objects.requireNonNullElseGet(lockID, 
AbstractFateStore::createDummyLockID);

Review Comment:
   Falling back to calling createDummyLockID here could hide bugs.  Could 
instead have the caller of this constructor that does not have a lockid call 
`AbstractFateStore.createDummyLockID()`. This removes the ambiguity of whether 
null was intentional or unintentional.  This could be a follow on issue.
   
   ```suggestion
       this.lockID = Objects.requireNonNull(lockID);
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -421,14 +569,30 @@ private Optional<FateKey> 
deserializeFateKey(DataInputBuffer buffer) throws IOEx
       return Optional.empty();
     }
 
+    private Optional<FateReservation> 
deserializeFateReservation(DataInputBuffer buffer)
+        throws IOException {
+      int length = buffer.readInt();
+      if (length > 0) {

Review Comment:
   If negative values are not expected, could add a check for that.
   
   ```suggestion
         int length = buffer.readInt();
         Preconditions.checkArgument(length >=0);
         if (length > 0) {
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
   }
 
   @Override
-  protected void create(FateId fateId, FateKey fateKey) {
-    final int maxAttempts = 5;
-
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+    Optional<FateTxStore<T>> txStore = Optional.empty();
+    int maxAttempts = 5;
+    FateMutator.Status status = null;
+
+    // We first need to write the initial/unreserved value for the reservation 
column
+    // Only need to retry if it is UNKNOWN
     for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
-      if (attempt >= 1) {
-        log.debug("Failed to create transaction with fateId {} and fateKey {}, 
trying again",
-            fateId, fateKey);
-        UtilWaitThread.sleep(100);
+      status = newMutator(fateId).putInitReservationVal().tryMutate();
+      if (status != FateMutator.Status.UNKNOWN) {
+        break;
       }
+      UtilWaitThread.sleep(100);
+    }
+    for (int attempt = 0; attempt < maxAttempts; attempt++) {
+      status = 
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
+          
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
+      if (status != FateMutator.Status.UNKNOWN) {
+        break;
+      }
+      UtilWaitThread.sleep(100);
+    }
 
-      var status = 
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
-          .putCreateTime(System.currentTimeMillis()).tryMutate();
+    switch (status) {
+      case ACCEPTED:
+        txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+        break;
+      case REJECTED:
+        // If the status is REJECTED, we need to check what about the mutation 
was REJECTED:
+        // 1) Possible something like the following occurred:
+        // the first attempt was UNKNOWN but written, the next attempt would 
be rejected
+        // We return the FateTxStore in this case.
+        // 2) If there is a collision with existing fate id, throw error
+        // 3) If the fate id is already reserved, return an empty optional
+        // 4) If the fate id is still NEW/unseeded and unreserved, we can try 
to reserve it
+        try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+          scanner.setRange(getRow(fateId));
+          scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+              TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+          scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+              TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+          
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+              TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+          TStatus statusSeen = TStatus.UNKNOWN;
+          Optional<FateKey> fateKeySeen = Optional.empty();
+          Optional<FateReservation> reservationSeen = Optional.empty();
+
+          for (Entry<Key,Value> entry : 
scanner.stream().collect(Collectors.toList())) {
+            Text colf = entry.getKey().getColumnFamily();
+            Text colq = entry.getKey().getColumnQualifier();
+            Value val = entry.getValue();
+
+            switch (colq.toString()) {
+              case TxColumnFamily.STATUS:
+                statusSeen = TStatus.valueOf(val.toString());
+                break;
+              case TxColumnFamily.TX_KEY:
+                fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
+                break;
+              case TxColumnFamily.RESERVATION:
+                if (FateReservation.isFateReservation(val.get())) {
+                  reservationSeen = 
Optional.of(FateReservation.deserialize(val.get()));
+                }
+                break;
+              default:
+                throw new IllegalStateException("Unexpected column seen: " + 
colf + ":" + colq);
+            }
+          }
 
-      switch (status) {
-        case ACCEPTED:
-          return;
-        case UNKNOWN:
-          continue;
-        case REJECTED:
-          throw new IllegalStateException("Attempt to create transaction with 
fateId " + fateId
-              + " and fateKey " + fateKey + " was rejected");
-        default:
-          throw new IllegalStateException("Unknown status " + status);
+          // This will be the case if the mutation status is REJECTED but the 
mutation was written
+          if (statusSeen == TStatus.NEW && reservationSeen.isPresent()
+              && reservationSeen.orElseThrow().equals(reservation)) {
+            verifyFateKey(fateId, fateKeySeen, fateKey);
+            txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+          } else if (statusSeen == TStatus.NEW && reservationSeen.isEmpty()) {
+            verifyFateKey(fateId, fateKeySeen, fateKey);
+            // NEW/unseeded transaction and not reserved, so we can allow it 
to be reserved
+            // we tryReserve() since another thread may have reserved it since 
the scan
+            txStore = tryReserve(fateId);

Review Comment:
   ```suggestion
               txStore = tryReserve(fateId);
               // the status was known before reserving to be NEW, however it 
could change so check after reserving to avoid race conditions.
               var statusAfterReserve = 
txStore.map(ReadOnlyFateTxStore::getStatus).orElse(TStatus.UNKNOWN);
               if (statusAfterReserve != TStatus.NEW) {
                 txStore.ifPresent(txs->txs.unreserve(Duration.ZERO));
                 txStore = Optional.empty();
               }            
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +322,20 @@ public void setStatus(TStatus status) {
       verifyReserved(true);
 
       try {
-        zk.putPersistentData(getTXPath(fateId), new 
NodeValue(status).serialize(),
-            NodeExistsPolicy.OVERWRITE);
-      } catch (KeeperException | InterruptedException e) {
+        zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+          NodeValue currNodeVal = new NodeValue(currSerializedData);
+          // Ensure the FateId is reserved in ZK, and it is reserved with the 
expected reservation
+          if (currNodeVal.isReservedBy(this.reservation)) {
+            FateReservation currFateReservation = 
currNodeVal.reservation.orElseThrow();
+            FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+            NodeValue newNodeValue = new NodeValue(status, 
currFateReservation, currFateKey);
+            return newNodeValue.serialize();
+          } else {
+            throw new IllegalStateException("Either the FateId is not reserved 
in ZK, or it is"

Review Comment:
   For this exception it would be useful to include the fateId.  In general any 
exception including the fate id in the message could be very helpful for 
debugging problems.  This could be a follow on issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to