kevinrr888 commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1700583872


##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -104,19 +109,108 @@ public FateId create() {
   }
 
   @Override
-  protected void create(FateId fateId, FateKey key) {
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
-          NodeExistsPolicy.FAIL);
-    } catch (KeeperException | InterruptedException e) {
+      byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId),
+          new NodeValue(TStatus.NEW, reservation, fateKey).serialize(), 
currSerNodeVal -> {
+            // We are only returning a non-null value for the following cases:
+            // 1) The existing NodeValue for fateId is exactly the same as the 
value set for the
+            // node if it doesn't yet exist:
+            // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = 
fateKey
+            // This might occur if there was a ZK server fault and the same 
write is running a 2nd
+            // time
+            // 2) The existing NodeValue for fateId has:
+            // TStatus = TStatus.NEW, no FateReservation present, FateKey = 
fateKey
+            // The fateId is NEW/unseeded and not reserved, so we can allow it 
to be reserved
+            NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+            if (currNodeVal.status == TStatus.NEW && 
currNodeVal.isReservedBy(reservation)) {
+              verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+              return currSerNodeVal;
+            } else if (currNodeVal.status == TStatus.NEW && 
!currNodeVal.isReserved()) {
+              verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+              // NEW/unseeded transaction and not reserved, so we can allow it 
to be reserved
+              return new NodeValue(TStatus.NEW, reservation, 
fateKey).serialize();
+            } else {
+              log.trace(
+                  "fate id {} tstatus {} fate key {} is reserved {} is either 
currently reserved "
+                      + "or has already been seeded with work (non-NEW 
status), or both",
+                  fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null),
+                  currNodeVal.isReserved());
+              // This will not change the value to null but will return null
+              return null;
+            }
+          });
+      if (nodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || 
currNodeVal.isReservedBy(reservation)) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
       throw new IllegalStateException(e);
     }
   }
 
   @Override
-  protected Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId) {
-    final NodeValue node = getNode(fateId);
-    return new Pair<>(node.status, node.fateKey);
+  public void deleteDeadReservations() {
+    for (Map.Entry<FateId,FateReservation> entry : 
getActiveReservations().entrySet()) {
+      FateId fateId = entry.getKey();
+      FateReservation reservation = entry.getValue();
+      if (isLockHeld.test(reservation.getLockID())) {
+        continue;
+      }
+      try {
+        zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
+          NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+          // Make sure the current node is still reserved and reserved with 
the expected reservation
+          // and it is dead
+          if (currNodeVal.isReservedBy(reservation)
+              && 
!isLockHeld.test(currNodeVal.reservation.orElseThrow().getLockID())) {
+            // Delete the reservation
+            log.trace("Deleted the dead reservation {} for fate id {}", 
reservation, fateId);
+            return new NodeValue(currNodeVal.status, null, 
currNodeVal.fateKey.orElse(null))
+                .serialize();
+          } else {
+            // No change
+            return null;
+          }
+        });
+      } catch (KeeperException | InterruptedException | 
AcceptableThriftTableOperationException e) {

Review Comment:
   Good catch. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to