kevinrr888 commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1693193627


##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +157,95 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(getRow(fateId));
+        
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+            TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+        FateReservation persistedRes = scanner.stream()
+            .filter(entry -> 
FateReservation.isFateReservation(entry.getValue().toString()))
+            .map(entry -> 
FateReservation.from(entry.getValue().toString())).findFirst()

Review Comment:
   Yes, I was running my new `MultipleStoresIT.testDeadReservationsCleanup()` 
test, which would occasionally fail with my new changes. The test works as 
follows:
   
   1) We have a fate object fate1 which is used to have 4 (the default 
MANAGER_FATE_THREADPOOL_SIZE) workers reserve 4 transactions with lock1
   2) We then simulate that the imaginary manager dies and a new one starts, so 
lock1 is no longer held instead lock2 is now held
   3) A new fate2 is created which spins up 4 more workers which should 
eventually reserve the 4 transactions with lock2 so long as the 
DeadReservationCleaner thread unreserves the transactions reserved with lock1
   
   Took a while to track down, but found that when waiting for the transactions 
to be reserved with lock2, multiple threads would sometimes reserve the same 
transaction, so not all 4 transactions would be reserved causing the test to 
fail. Found that the issue was due to the change from storing an unreserved 
value in the RESERVATION column to just deleting/creating the column on 
unreserve/reserve.
   
   I could not figure out why or how this could occur with these changes. I 
could potentially put a new commit up at some point with these changes to see 
if you could find anything wrong with how I am doing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to