keith-turner commented on code in PR #5263:
URL: https://github.com/apache/accumulo/pull/5263#discussion_r1920697167


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -198,55 +210,62 @@ private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        FateTxStore<T> txStore = null;
-        ExecutionState state = new ExecutionState();
-        try {
-          var optionalopStore = reserveFateTx();
-          if (optionalopStore.isPresent()) {
-            txStore = optionalopStore.orElseThrow();
-          } else {
-            continue;
-          }
-          state.status = txStore.getStatus();
-          state.op = txStore.top();
-          if (state.status == FAILED_IN_PROGRESS) {
-            processFailed(txStore, state.op);
-          } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) 
{
-            try {
-              execute(txStore, state);
-              if (state.op != null && state.deferTime != 0) {
-                // The current op is not ready to execute
-                continue;
-              }
-            } catch (StackOverflowException e) {
-              // the op that failed to push onto the stack was never executed, 
so no need to undo
-              // it just transition to failed and undo the ops that executed
-              transitionToFailed(txStore, e);
-              continue;
-            } catch (Exception e) {
-              blockIfHadoopShutdown(txStore.getID(), e);
-              transitionToFailed(txStore, e);
+      runningTxRunners.add(this);
+      try {
+        while (keepRunning.get() && !stop.get()) {
+          FateTxStore<T> txStore = null;
+          ExecutionState state = new ExecutionState();
+          try {
+            var optionalopStore = reserveFateTx();
+            if (optionalopStore.isPresent()) {
+              txStore = optionalopStore.orElseThrow();
+            } else {
               continue;
             }
+            state.status = txStore.getStatus();
+            state.op = txStore.top();
+            if (state.status == FAILED_IN_PROGRESS) {
+              processFailed(txStore, state.op);
+            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+              try {
+                execute(txStore, state);
+                if (state.op != null && state.deferTime != 0) {
+                  // The current op is not ready to execute
+                  continue;
+                }
+              } catch (StackOverflowException e) {
+                // the op that failed to push onto the stack was never 
executed, so no need to undo
+                // it just transition to failed and undo the ops that executed
+                transitionToFailed(txStore, e);
+                continue;
+              } catch (Exception e) {
+                blockIfHadoopShutdown(txStore.getID(), e);
+                transitionToFailed(txStore, e);
+                continue;
+              }
 
-            if (state.op == null) {
-              // transaction is finished
-              String ret = state.prevOp.getReturn();
-              if (ret != null) {
-                txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+              if (state.op == null) {
+                // transaction is finished
+                String ret = state.prevOp.getReturn();
+                if (ret != null) {
+                  txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+                }
+                txStore.setStatus(SUCCESSFUL);
+                doCleanUp(txStore);
               }
-              txStore.setStatus(SUCCESSFUL);
-              doCleanUp(txStore);
             }
-          }
-        } catch (Exception e) {
-          runnerLog.error("Uncaught exception in FATE runner thread.", e);
-        } finally {
-          if (txStore != null) {
-            txStore.unreserve(Duration.ofMillis(state.deferTime));
+          } catch (Exception e) {
+            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          } finally {
+            if (txStore != null) {
+              txStore.unreserve(Duration.ofMillis(state.deferTime));
+            }
           }
         }
+      } finally {
+        log.trace("A TransactionRunner is exiting...");
+        Preconditions.checkState(exited.compareAndSet(false, true));

Review Comment:
   Seems like the exited atomic boolean could be removed as its not being used. 
 Removing this from runningTxRunners in this finally is doing what exited was 
intended for. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to