keith-turner commented on code in PR #5263:
URL: https://github.com/apache/accumulo/pull/5263#discussion_r1920679386


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -198,55 +210,62 @@ private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        FateTxStore<T> txStore = null;
-        ExecutionState state = new ExecutionState();
-        try {
-          var optionalopStore = reserveFateTx();
-          if (optionalopStore.isPresent()) {
-            txStore = optionalopStore.orElseThrow();
-          } else {
-            continue;
-          }
-          state.status = txStore.getStatus();
-          state.op = txStore.top();
-          if (state.status == FAILED_IN_PROGRESS) {
-            processFailed(txStore, state.op);
-          } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) 
{
-            try {
-              execute(txStore, state);
-              if (state.op != null && state.deferTime != 0) {
-                // The current op is not ready to execute
-                continue;
-              }
-            } catch (StackOverflowException e) {
-              // the op that failed to push onto the stack was never executed, 
so no need to undo
-              // it just transition to failed and undo the ops that executed
-              transitionToFailed(txStore, e);
-              continue;
-            } catch (Exception e) {
-              blockIfHadoopShutdown(txStore.getID(), e);
-              transitionToFailed(txStore, e);
+      runningTxRunners.add(this);
+      try {
+        while (keepRunning.get() && !stop.get()) {
+          FateTxStore<T> txStore = null;
+          ExecutionState state = new ExecutionState();
+          try {
+            var optionalopStore = reserveFateTx();
+            if (optionalopStore.isPresent()) {
+              txStore = optionalopStore.orElseThrow();
+            } else {
               continue;
             }
+            state.status = txStore.getStatus();
+            state.op = txStore.top();
+            if (state.status == FAILED_IN_PROGRESS) {
+              processFailed(txStore, state.op);
+            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+              try {
+                execute(txStore, state);
+                if (state.op != null && state.deferTime != 0) {
+                  // The current op is not ready to execute
+                  continue;
+                }
+              } catch (StackOverflowException e) {
+                // the op that failed to push onto the stack was never 
executed, so no need to undo
+                // it just transition to failed and undo the ops that executed
+                transitionToFailed(txStore, e);
+                continue;
+              } catch (Exception e) {
+                blockIfHadoopShutdown(txStore.getID(), e);
+                transitionToFailed(txStore, e);
+                continue;
+              }
 
-            if (state.op == null) {
-              // transaction is finished
-              String ret = state.prevOp.getReturn();
-              if (ret != null) {
-                txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+              if (state.op == null) {
+                // transaction is finished
+                String ret = state.prevOp.getReturn();
+                if (ret != null) {
+                  txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+                }
+                txStore.setStatus(SUCCESSFUL);
+                doCleanUp(txStore);
               }
-              txStore.setStatus(SUCCESSFUL);
-              doCleanUp(txStore);
             }
-          }
-        } catch (Exception e) {
-          runnerLog.error("Uncaught exception in FATE runner thread.", e);
-        } finally {
-          if (txStore != null) {
-            txStore.unreserve(Duration.ofMillis(state.deferTime));
+          } catch (Exception e) {
+            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          } finally {
+            if (txStore != null) {
+              txStore.unreserve(Duration.ofMillis(state.deferTime));
+            }
           }
         }
+      } finally {
+        log.trace("A TransactionRunner is exiting...");
+        Preconditions.checkState(exited.compareAndSet(false, true));
+        Preconditions.checkState(runningTxRunners.remove(this));

Review Comment:
   This will rely on the default Object.equals() code, which is fine because  
exact object equality is the desired behavior.  If going to rely on that could 
take a bit further and make `runningTxRunners` a hash set instead of a list, 
then remove would be really fast.  That would just mean we are relying on the 
default Object.equals() and Object.hashCode() impls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to