DomGarguilo commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1663115683


##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +157,95 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(getRow(fateId));
+        
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+            TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+        FateReservation persistedRes = scanner.stream()
+            .filter(entry -> 
FateReservation.isFateReservation(entry.getValue().toString()))
+            .map(entry -> 
FateReservation.from(entry.getValue().toString())).findFirst()
+            .orElse(null);
+        if (persistedRes != null && persistedRes.equals(reservation)) {
+          return Optional.of(new FateTxStoreImpl(fateId, reservation));
+        }

Review Comment:
   ```suggestion
           return scanner.stream().map(entry -> entry.getValue().toString())
               
.filter(FateReservation::isFateReservation).map(FateReservation::from)
               .filter(persistedRes -> 
persistedRes.equals(reservation)).findFirst()
               .map(persistedRes -> new FateTxStoreImpl(fateId, reservation));
   ```
   
   These could be combined



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -360,7 +380,27 @@ public Fate(T environment, FateStore<T> store, 
Function<Repo<T>,String> toLogStr
         }
       }
     }, 3, SECONDS));
-    this.executor = pool;
+    this.transactionExecutor = pool;
+
+    // Create a dead reservation cleaner for this store that will periodically 
(every 30 seconds)
+    // clean up reservations held by dead processes, if they exist. Only 
created if a dead
+    // reservation cleaner is not already running for the given store type.
+    // TODO 4131 periodic cleanup runs every 30 seconds
+    // Should this be longer? Shorter? A configurable Property? A function of 
something?
+    ScheduledExecutorService deadResCleanerExecutor = 
ThreadPools.getServerThreadPools()
+        .createScheduledExecutorService(1, store.type() + 
"-dead-reservation-cleaner-pool");
+    if ((store.type() == FateInstanceType.USER && 
!userDeadReservationCleanerRunning)
+        || (store.type() == FateInstanceType.META && 
!metaDeadReservationCleanerRunning)) {
+      ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
+          .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, 
SECONDS);
+      ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
+      if (store.type() == FateInstanceType.USER) {
+        userDeadReservationCleanerRunning = true;
+      } else if (store.type() == FateInstanceType.META) {
+        metaDeadReservationCleanerRunning = true;
+      }
+    }

Review Comment:
   ```suggestion
       boolean isUserStore = store.type() == FateInstanceType.USER;
       boolean isMetaStore = store.type() == FateInstanceType.META;
       if ((isUserStore && !userDeadReservationCleanerRunning)
           || (isMetaStore && !metaDeadReservationCleanerRunning)) {
         ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
             .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, 
SECONDS);
         ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
         if (isUserStore) {
           userDeadReservationCleanerRunning = true;
         } else {
           metaDeadReservationCleanerRunning = true;
         }
       }
   ```
   Refactoring out these conditions might help with readability a tiny bit.



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -73,9 +75,12 @@ public class Fate<T> {
   private final FateStore<T> store;
   private final T environment;
   private final ScheduledThreadPoolExecutor fatePoolWatcher;
-  private final ExecutorService executor;
+  private final ExecutorService transactionExecutor;
+  private final ExecutorService deadResCleanerExecutor;
 
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
+  private static boolean userDeadReservationCleanerRunning = false;
+  private static boolean metaDeadReservationCleanerRunning = false;

Review Comment:
   Should these be Atomic? Or is it fine to keep them as-is?



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -360,7 +380,27 @@ public Fate(T environment, FateStore<T> store, 
Function<Repo<T>,String> toLogStr
         }
       }
     }, 3, SECONDS));
-    this.executor = pool;
+    this.transactionExecutor = pool;
+
+    // Create a dead reservation cleaner for this store that will periodically 
(every 30 seconds)
+    // clean up reservations held by dead processes, if they exist. Only 
created if a dead
+    // reservation cleaner is not already running for the given store type.
+    // TODO 4131 periodic cleanup runs every 30 seconds
+    // Should this be longer? Shorter? A configurable Property? A function of 
something?
+    ScheduledExecutorService deadResCleanerExecutor = 
ThreadPools.getServerThreadPools()
+        .createScheduledExecutorService(1, store.type() + 
"-dead-reservation-cleaner-pool");
+    if ((store.type() == FateInstanceType.USER && 
!userDeadReservationCleanerRunning)
+        || (store.type() == FateInstanceType.META && 
!metaDeadReservationCleanerRunning)) {
+      ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
+          .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, 
SECONDS);
+      ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
+      if (store.type() == FateInstanceType.USER) {
+        userDeadReservationCleanerRunning = true;
+      } else if (store.type() == FateInstanceType.META) {
+        metaDeadReservationCleanerRunning = true;
+      }
+    }
+    this.deadResCleanerExecutor = deadResCleanerExecutor;

Review Comment:
   If the if statement above is never entered, will this executor ever be used? 
If not I wonder if we should avoid creating it. This is just a question, not 
sure what should be done here.



##########
core/src/main/java/org/apache/accumulo/core/fate/FateStore.java:
##########
@@ -107,11 +118,155 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
     void unreserve(Duration deferTime);
   }
 
+  /**
+   * The value stored to indicate a FATE transaction ID ({@link FateId}) has 
been reserved
+   */
+  class FateReservation {
+
+    // The LockID (provided by the Manager running the FATE which uses this 
store) which is used for
+    // identifying dead Managers, so their reservations can be deleted and 
picked up again since
+    // they can no longer be worked on.
+    private final ZooUtil.LockID lockID; // TODO 4131 not sure if this is the 
best type for this
+    // The UUID generated on a reservation attempt (tryReserve()) used to 
uniquely identify that
+    // attempt. This is useful for the edge case where the reservation is sent 
to the server
+    // (Tablet Server for UserFateStore and the ZooKeeper Server for 
MetaFateStore), but the server
+    // dies before the store receives the response. It allows us to determine 
if the reservation
+    // was successful and was written by this reservation attempt (could have 
been successfully
+    // reserved by another attempt or not reserved at all, in which case, we 
wouldn't want to
+    // expose a FateTxStore).
+    private final UUID reservationUUID;
+    private final byte[] serialized;
+    private static final Pattern UUID_PATTERN =
+        
Pattern.compile("^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$");
+    private static final Pattern LOCKID_PATTERN = 
Pattern.compile("^.+/.+\\$[0-9a-fA-F]+$");
+
+    private FateReservation(ZooUtil.LockID lockID, UUID reservationUUID) {
+      this.lockID = Objects.requireNonNull(lockID);
+      this.reservationUUID = Objects.requireNonNull(reservationUUID);
+      this.serialized = serialize(lockID, reservationUUID);
+    }
+
+    public static FateReservation from(ZooUtil.LockID lockID, UUID 
reservationUUID) {
+      return new FateReservation(lockID, reservationUUID);
+    }
+
+    public static FateReservation from(byte[] serialized) {
+      try (DataInputBuffer buffer = new DataInputBuffer()) {
+        buffer.reset(serialized, serialized.length);
+        ZooUtil.LockID lockID = new ZooUtil.LockID("", buffer.readUTF());
+        UUID reservationUUID = UUID.fromString(buffer.readUTF());
+        return new FateReservation(lockID, reservationUUID);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    public static FateReservation from(String fateReservationStr) {
+      if (isFateReservation(fateReservationStr)) {
+        String[] fields = fateReservationStr.split(":");
+        ZooUtil.LockID lockId = new ZooUtil.LockID("", fields[0]);
+        UUID reservationUUID = UUID.fromString(fields[1]);
+        return new FateReservation(lockId, reservationUUID);
+      } else {
+        throw new IllegalArgumentException(
+            "Tried to create a FateReservation from an invalid string: " + 
fateReservationStr);
+      }
+    }
+
+    /**
+     *
+     * @param fateReservationStr the string from a call to {@link 
FateReservation#toString()}
+     * @return true if the string represents a valid FateReservation object, 
false otherwise
+     */
+    public static boolean isFateReservation(String fateReservationStr) {
+      if (fateReservationStr != null) {
+        String[] fields = fateReservationStr.split(":");
+        if (fields.length == 2) {
+          return LOCKID_PATTERN.matcher(fields[0]).matches()
+              && UUID_PATTERN.matcher(fields[1]).matches();
+        }
+      }
+      return false;
+    }
+
+    public ZooUtil.LockID getLockID() {
+      return lockID;
+    }
+
+    public UUID getReservationUUID() {
+      return reservationUUID;
+    }
+
+    public byte[] getSerialized() {
+      return serialized;
+    }
+
+    private static byte[] serialize(ZooUtil.LockID lockID, UUID 
reservationUUID) {
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          DataOutputStream dos = new DataOutputStream(baos)) {
+        dos.writeUTF(lockID.serialize("/"));
+        dos.writeUTF(reservationUUID.toString());
+        dos.close();

Review Comment:
   Is there a reason you are calling `close()` early? Could just call `flush()` 
instead if that is the intent and let the `try-with-resources` close things 
automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to