kevinrr888 commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1688552235


##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,95 @@ public FateId create() {
   @Override
   protected void create(FateId fateId, FateKey key) {
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
+      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null, 
key).serialize(),
           NodeExistsPolicy.FAIL);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+            && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    return getNode(fateId).isReserved();
+  }
+
+  @Override
+  public Map<FateId,FateReservation> getActiveReservations() {
+    Map<FateId,FateReservation> activeReservations = new HashMap<>();
+
+    try {
+      for (String strTxId : zk.getChildren(path)) {
+        String txUUIDStr = strTxId.split("_")[1];
+        FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
+        if (isReserved(fateId)) {
+          FateReservation reservation = 
getNode(fateId).reservation.orElseThrow();
+          activeReservations.put(fateId, reservation);
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    return activeReservations;
+  }
+
+  @Override
+  public void deleteDeadReservations() {
+    for (Map.Entry<FateId,FateReservation> entry : 
getActiveReservations().entrySet()) {
+      FateId fateId = entry.getKey();
+      FateReservation reservation = entry.getValue();

Review Comment:
   Nice improvement. Added



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +316,21 @@ public void setStatus(TStatus status) {
       verifyReserved(true);
 
       try {
-        zk.putPersistentData(getTXPath(fateId), new 
NodeValue(status).serialize(),
-            NodeExistsPolicy.OVERWRITE);
-      } catch (KeeperException | InterruptedException e) {
+        zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+          NodeValue currNodeVal = new NodeValue(currSerializedData);
+          // Ensure the FateId is reserved in ZK, and it is reserved with the 
expected reservation
+          if (currNodeVal.isReserved()
+              && 
currNodeVal.reservation.orElseThrow().equals(this.reservation)) {

Review Comment:
   Yeah I do that check quite a bit... I think it's a good change and is more 
clear. Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to