keith-turner commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1671222699


##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -114,38 +131,26 @@ public static Object deserialize(byte[] ser) {
     }
   }
 
-  /**
-   * Attempt to reserve the fate transaction.
-   *
-   * @param fateId The FateId
-   * @return An Optional containing the FateTxStore if the transaction was 
successfully reserved, or
-   *         an empty Optional if the transaction was already reserved.
-   */
-  @Override
-  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
-    synchronized (this) {
-      if (!reserved.contains(fateId)) {
-        return Optional.of(reserve(fateId));
-      }
-      return Optional.empty();
-    }
-  }
-
   @Override
   public FateTxStore<T> reserve(FateId fateId) {
-    synchronized (AbstractFateStore.this) {
-      while (reserved.contains(fateId)) {
-        try {
-          AbstractFateStore.this.wait(100);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IllegalStateException(e);
-        }
+    Preconditions.checkState(!_getStatus(fateId).equals(TStatus.UNKNOWN),
+        "Attempted to reserve a tx that does not exist: " + fateId);

Review Comment:
   Something could exist and be reserved when we enter the method and be 
deleted while we are waiting to reserve it. This will do an RPC to read data 
and then tryRerserve will do another RPC.  For efficiency and correctness would 
be better to only do this check in the case when tryReserve returns empty.  If 
the check is done in the loop it will handle the case of something being 
deleted while waiting.  Also if the check is done in the loop, then when 
tryReserve is successful then the 2nd RPC will never be made.



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -361,90 +356,108 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey 
fateKey) {
 
   protected abstract Optional<FateKey> getKey(FateId fateId);
 
-  protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean 
isReserved);
-
-  protected abstract FateInstanceType getInstanceType();
+  protected abstract FateTxStore<T> newUnreservedFateTxStore(FateId fateId);
 
   protected abstract class AbstractFateTxStoreImpl<T> implements 
FateTxStore<T> {
     protected final FateId fateId;
-    protected final boolean isReserved;
+    protected boolean deleted;
+    protected FateReservation reservation;
 
     protected TStatus observedStatus = null;
 
-    protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
+    protected AbstractFateTxStoreImpl(FateId fateId) {
+      this.fateId = fateId;
+      this.deleted = false;
+      this.reservation = null;
+    }
+
+    protected AbstractFateTxStoreImpl(FateId fateId, FateReservation 
reservation) {
       this.fateId = fateId;
-      this.isReserved = isReserved;
+      this.deleted = false;
+      this.reservation = Objects.requireNonNull(reservation);
+    }
+
+    protected boolean isReserved() {
+      return this.reservation != null;
     }
 
     @Override
     public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
-      Preconditions.checkState(!isReserved,
-          "Attempted to wait for status change while reserved " + fateId);
-      while (true) {
+      Preconditions.checkState(!isReserved(),
+          "Attempted to wait for status change while reserved: " + fateId);
+      verifyReserved(false);
 
-        long countBefore = unreservedNonNewCount.getCount();
+      int currNumCallers = concurrentStatusChangeCallers.incrementAndGet();

Review Comment:
   This change makes the polling more efficient in the case of many things 
waiting for status change.  All the code following this should be placed in a 
try catch, something like the following
   
   ```java
   try {
      // polling loop
      
   } finally {
      concurrentStatusChangeCallers.decrementAndGet();
   }
   ```
   
   this ensures that its decremented even when an exception happens.
   
   One thing that is being lost in this change is the in memory signalling.  
Not something for this PR, but would be good to have something that does some 
in memory signalling and is efficient w/ the polling as the number of pollers 
increases.  The less latency there is in the impl of the method the faster 
things like create table, bulk import, etc will be to complete from the user 
API perspective.  The in memory signalling can help reduce latency.
   
   



##########
core/src/main/java/org/apache/accumulo/core/fate/FateStore.java:
##########
@@ -107,11 +118,155 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
     void unreserve(Duration deferTime);
   }
 
+  /**
+   * The value stored to indicate a FATE transaction ID ({@link FateId}) has 
been reserved
+   */
+  class FateReservation {
+
+    // The LockID (provided by the Manager running the FATE which uses this 
store) which is used for
+    // identifying dead Managers, so their reservations can be deleted and 
picked up again since
+    // they can no longer be worked on.
+    private final ZooUtil.LockID lockID; // TODO 4131 not sure if this is the 
best type for this
+    // The UUID generated on a reservation attempt (tryReserve()) used to 
uniquely identify that
+    // attempt. This is useful for the edge case where the reservation is sent 
to the server
+    // (Tablet Server for UserFateStore and the ZooKeeper Server for 
MetaFateStore), but the server
+    // dies before the store receives the response. It allows us to determine 
if the reservation
+    // was successful and was written by this reservation attempt (could have 
been successfully
+    // reserved by another attempt or not reserved at all, in which case, we 
wouldn't want to
+    // expose a FateTxStore).
+    private final UUID reservationUUID;
+    private final byte[] serialized;
+    private static final Pattern UUID_PATTERN =
+        
Pattern.compile("^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$");
+    private static final Pattern LOCKID_PATTERN = 
Pattern.compile("^.+/.+\\$[0-9a-fA-F]+$");
+
+    private FateReservation(ZooUtil.LockID lockID, UUID reservationUUID) {
+      this.lockID = Objects.requireNonNull(lockID);
+      this.reservationUUID = Objects.requireNonNull(reservationUUID);
+      this.serialized = serialize(lockID, reservationUUID);
+    }
+
+    public static FateReservation from(ZooUtil.LockID lockID, UUID 
reservationUUID) {
+      return new FateReservation(lockID, reservationUUID);
+    }
+
+    public static FateReservation from(byte[] serialized) {
+      try (DataInputBuffer buffer = new DataInputBuffer()) {
+        buffer.reset(serialized, serialized.length);
+        ZooUtil.LockID lockID = new ZooUtil.LockID("", buffer.readUTF());
+        UUID reservationUUID = UUID.fromString(buffer.readUTF());
+        return new FateReservation(lockID, reservationUUID);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    public static FateReservation from(String fateReservationStr) {
+      if (isFateReservation(fateReservationStr)) {
+        String[] fields = fateReservationStr.split(":");
+        ZooUtil.LockID lockId = new ZooUtil.LockID("", fields[0]);
+        UUID reservationUUID = UUID.fromString(fields[1]);
+        return new FateReservation(lockId, reservationUUID);
+      } else {
+        throw new IllegalArgumentException(
+            "Tried to create a FateReservation from an invalid string: " + 
fateReservationStr);
+      }
+    }
+
+    /**
+     *
+     * @param fateReservationStr the string from a call to {@link 
FateReservation#toString()}
+     * @return true if the string represents a valid FateReservation object, 
false otherwise
+     */
+    public static boolean isFateReservation(String fateReservationStr) {
+      if (fateReservationStr != null) {
+        String[] fields = fateReservationStr.split(":");
+        if (fields.length == 2) {
+          return LOCKID_PATTERN.matcher(fields[0]).matches()
+              && UUID_PATTERN.matcher(fields[1]).matches();

Review Comment:
   Could use UuidUtil.isUUID here



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,95 @@ public FateId create() {
   @Override
   protected void create(FateId fateId, FateKey key) {
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
+      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null, 
key).serialize(),
           NodeExistsPolicy.FAIL);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+            && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    return getNode(fateId).isReserved();
+  }
+
+  @Override
+  public Map<FateId,FateReservation> getActiveReservations() {
+    Map<FateId,FateReservation> activeReservations = new HashMap<>();
+
+    try {
+      for (String strTxId : zk.getChildren(path)) {
+        String txUUIDStr = strTxId.split("_")[1];
+        FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
+        if (isReserved(fateId)) {
+          FateReservation reservation = 
getNode(fateId).reservation.orElseThrow();
+          activeReservations.put(fateId, reservation);
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    return activeReservations;
+  }
+
+  @Override
+  public void deleteDeadReservations() {
+    for (Map.Entry<FateId,FateReservation> entry : 
getActiveReservations().entrySet()) {
+      FateId fateId = entry.getKey();
+      FateReservation reservation = entry.getValue();

Review Comment:
   Can avoid calling `zk.mutateExisting` and reading  from zookeeper in the 
case where the lock is held with a check like the following.
   
   ```suggestion
         FateReservation reservation = entry.getValue();
         if(isLockHeld(reservation.getLockID())) {
            continue;
         }
   ```



##########
core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java:
##########
@@ -175,6 +176,21 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey 
fateKey) {
         }
         return txStore;
       }
+
+      @Override
+      public boolean isReserved(FateId fateId) {
+        return store.isReserved(fateId);
+      }
+
+      @Override
+      public Map<FateId,FateReservation> getActiveReservations() {
+        return store.getActiveReservations();
+      }
+
+      @Override
+      public void deleteDeadReservations() {
+        store.deleteDeadReservations();

Review Comment:
   Probably would not go here, but it would be nice to have consistent logging 
somehow for each dead reservation that is detected and deleted.



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -55,7 +60,7 @@
 
 //TODO use zoocache? - ACCUMULO-1297
 //TODO handle zookeeper being down gracefully - ACCUMULO-1297
-
+// TODO 4131 noticed this class is not in the fate.zookeeper package. Should 
it be?

Review Comment:
   Yeah it  probably should be.  That would be consistent with UserFateStore, 
which is in a sub package.



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +316,21 @@ public void setStatus(TStatus status) {
       verifyReserved(true);
 
       try {
-        zk.putPersistentData(getTXPath(fateId), new 
NodeValue(status).serialize(),
-            NodeExistsPolicy.OVERWRITE);
-      } catch (KeeperException | InterruptedException e) {
+        zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+          NodeValue currNodeVal = new NodeValue(currSerializedData);
+          // Ensure the FateId is reserved in ZK, and it is reserved with the 
expected reservation
+          if (currNodeVal.isReserved()
+              && 
currNodeVal.reservation.orElseThrow().equals(this.reservation)) {

Review Comment:
   Not sure if this change is worthwhile.  Saw this pattern a few times, could 
create a method in NodeValue and push the check there to shorten code 
elsewhere. 
   
   ```suggestion
             if (currNodeVal.isReservedBy(this.reservation)) {
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java:
##########
@@ -79,6 +82,33 @@ public FateMutator<T> putCreateTime(long ctime) {
     return this;
   }
 
+  @Override
+  public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+        
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
+    mutation.addCondition(condition);
+    TxColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.toString()));

Review Comment:
   Its nice to avoid calling toString() when its return value is used with 
serialized data as the expectations around toString() are not well defined. For 
example someone could change the toString impl to improve a log message or 
something like that and not realize the implications for serialized data.
   
   ```suggestion
       TxColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.getSerialized()));
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java:
##########
@@ -79,6 +82,33 @@ public FateMutator<T> putCreateTime(long ctime) {
     return this;
   }
 
+  @Override
+  public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+        
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
+    mutation.addCondition(condition);
+    TxColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.toString()));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
+    Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+        
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(reservation.toString());

Review Comment:
   ```suggestion
           
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(reservation.getSerialized());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to