keith-turner commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1670838100


##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,95 @@ public FateId create() {
   @Override
   protected void create(FateId fateId, FateKey key) {
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
+      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null, 
key).serialize(),
           NodeExistsPolicy.FAIL);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+            && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    return getNode(fateId).isReserved();
+  }
+
+  @Override
+  public Map<FateId,FateReservation> getActiveReservations() {
+    Map<FateId,FateReservation> activeReservations = new HashMap<>();
+
+    try {
+      for (String strTxId : zk.getChildren(path)) {
+        String txUUIDStr = strTxId.split("_")[1];
+        FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
+        if (isReserved(fateId)) {
+          FateReservation reservation = 
getNode(fateId).reservation.orElseThrow();

Review Comment:
   This is sometime calling getNode twice, making two RPCs to zookeeper because 
`isReserved(fateId)` calls `getNode()`.  Could refactor this to only read from 
zookeeper once.



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -168,18 +173,16 @@ public void runnable(AtomicBoolean keepWaiting, 
Consumer<FateId> idConsumer) {
         var transactions = Stream.concat(inProgress, other);
         transactions.filter(fateIdStatus -> 
isRunnable(fateIdStatus.getStatus()))
             .map(FateIdStatus::getFateId).filter(fateId -> {
-              synchronized (AbstractFateStore.this) {
-                var deferredTime = deferred.get(fateId);
-                if (deferredTime != null) {
-                  if (deferredTime.elapsed().isNegative()) {
-                    // negative elapsed time indicates the deferral time is in 
the future
-                    return false;
-                  } else {
-                    deferred.remove(fateId);
-                  }
+              var deferredTime = deferred.get(fateId);
+              if (deferredTime != null) {
+                if (deferredTime.elapsed().isNegative()) {
+                  // negative elapsed time indicates the deferral time is in 
the future
+                  return false;
+                } else {
+                  deferred.remove(fateId);
                 }
-                return !reserved.contains(fateId);
               }
+              return !isReserved(fateId);

Review Comment:
   This will do a scan or zookeeper read of data inside a loop that is 
processing scanned data.  So overall this will look like the following 
conceptually.
   
   ```java
     for(scan1) {
         // run scan 2 that will run a RPC to a remote system
     }
   ```
   
   In both impls we could avoid this scan in the inner loop by obtaining the 
information in the outer scan.  This wold require refactoring `FateIdStatus` to 
include reservation information.  For UserFateStore then the original scan 
could easily obtain the information.  For MetaFateStore it could obtain the 
status and reservation in the same ZK read as these are both stored in 



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -298,7 +299,13 @@ private Optional<FateId> create(FateKey fateKey) {
 
   @Override
   public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
-    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
+    // TODO 4131 not confident about this new implementation of 
createAndReserve.

Review Comment:
   The way this method previously worked and the way the code was structured it 
was driven by the lock object being in AbstractFateStore.  Now that there is no 
longer a lock we want to implement the following algorithm for both stores 
using conditional updates (@cshannon you added the fatekey feature, does the 
following algorithm  seem ok to you?).
   
    1.  var fateId = FateId fateId = fateIdGenerator.fromTypeAndKey(type(), 
fateKey);
    2. in the store conditionally create  fateId setting the status, 
reservation and fateKey atomically at creation time. The condition should check 
that nothing currently exist for the fateId, its expected to be absent.
    3. If the conditional update fails for some reason, read the fateId and see 
if it already has the expected status, fateKey and reservation.  If it does, 
then probably running for a second time and already reserved.  If it does not 
then need to take the following actions.
       1. If the fateId exists and has the same fateKey, but different status 
or reservation than expected then return Optional.empty(). This is the case 
where another thread has already created and reserved using the fateKey so 
there is nothing to do.
       2. If the fateId exists and has different fateKey or no fateKey then 
this represents an unexpected collision (and this unexpected with the 128 bit 
keys) so throw an exception in this case.
    5. If the conditional update succeed , then we can return a non empty 
Optional w/ a FateTxStore obj.
   
   Now that the lock object is no longer needed in AbstractFateStore it may 
make sense to completely push the full implementation of the  
`createAndReserve` method into MetaFateStore and UserFateStore and have nothing 
in AbstractFateStore for the method. The reason I am thinking this may be 
better is because the algorithm above is based on conditional updates now and 
how ZK and accumulo do conditional updates is very different.  However there 
may still be opportunities to share common code, so not sure of the best way to 
structure the code.
   
   
   



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -73,9 +75,12 @@ public class Fate<T> {
   private final FateStore<T> store;
   private final T environment;
   private final ScheduledThreadPoolExecutor fatePoolWatcher;
-  private final ExecutorService executor;
+  private final ExecutorService transactionExecutor;
+  private final ExecutorService deadResCleanerExecutor;
 
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
+  private static boolean userDeadReservationCleanerRunning = false;
+  private static boolean metaDeadReservationCleanerRunning = false;

Review Comment:
   Can these static variables could be removed?  Do these help testing in 
someway? In the manager we can assume that only a single instance of each type 
is created.  If we wanted to guard against multiple instances per type, then  a 
more general check could be done that is not limited to dead reservation 
cleaners.
   



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +157,95 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(getRow(fateId));
+        
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+            TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+        FateReservation persistedRes = scanner.stream()
+            .filter(entry -> 
FateReservation.isFateReservation(entry.getValue().toString()))
+            .map(entry -> 
FateReservation.from(entry.getValue().toString())).findFirst()

Review Comment:
   Given the way the scanner was constructed, would we expect to see anything 
that is not a reservation?  If only reservations are expected then could drop 
the filter and let `FateReservation.from` throw an exception if it seems 
something that is not a reservation.
   
   ```suggestion
               .map(entry -> 
FateReservation.from(entry.getValue().toString())).findFirst()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to