kevinrr888 commented on code in PR #5263:
URL: https://github.com/apache/accumulo/pull/5263#discussion_r1920721262


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -198,55 +210,62 @@ private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        FateTxStore<T> txStore = null;
-        ExecutionState state = new ExecutionState();
-        try {
-          var optionalopStore = reserveFateTx();
-          if (optionalopStore.isPresent()) {
-            txStore = optionalopStore.orElseThrow();
-          } else {
-            continue;
-          }
-          state.status = txStore.getStatus();
-          state.op = txStore.top();
-          if (state.status == FAILED_IN_PROGRESS) {
-            processFailed(txStore, state.op);
-          } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) 
{
-            try {
-              execute(txStore, state);
-              if (state.op != null && state.deferTime != 0) {
-                // The current op is not ready to execute
-                continue;
-              }
-            } catch (StackOverflowException e) {
-              // the op that failed to push onto the stack was never executed, 
so no need to undo
-              // it just transition to failed and undo the ops that executed
-              transitionToFailed(txStore, e);
-              continue;
-            } catch (Exception e) {
-              blockIfHadoopShutdown(txStore.getID(), e);
-              transitionToFailed(txStore, e);
+      runningTxRunners.add(this);
+      try {
+        while (keepRunning.get() && !stop.get()) {
+          FateTxStore<T> txStore = null;
+          ExecutionState state = new ExecutionState();
+          try {
+            var optionalopStore = reserveFateTx();
+            if (optionalopStore.isPresent()) {
+              txStore = optionalopStore.orElseThrow();
+            } else {
               continue;
             }
+            state.status = txStore.getStatus();
+            state.op = txStore.top();
+            if (state.status == FAILED_IN_PROGRESS) {
+              processFailed(txStore, state.op);
+            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+              try {
+                execute(txStore, state);
+                if (state.op != null && state.deferTime != 0) {
+                  // The current op is not ready to execute
+                  continue;
+                }
+              } catch (StackOverflowException e) {
+                // the op that failed to push onto the stack was never 
executed, so no need to undo
+                // it just transition to failed and undo the ops that executed
+                transitionToFailed(txStore, e);
+                continue;
+              } catch (Exception e) {
+                blockIfHadoopShutdown(txStore.getID(), e);
+                transitionToFailed(txStore, e);
+                continue;
+              }
 
-            if (state.op == null) {
-              // transaction is finished
-              String ret = state.prevOp.getReturn();
-              if (ret != null) {
-                txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+              if (state.op == null) {
+                // transaction is finished
+                String ret = state.prevOp.getReturn();
+                if (ret != null) {
+                  txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+                }
+                txStore.setStatus(SUCCESSFUL);
+                doCleanUp(txStore);
               }
-              txStore.setStatus(SUCCESSFUL);
-              doCleanUp(txStore);
             }
-          }
-        } catch (Exception e) {
-          runnerLog.error("Uncaught exception in FATE runner thread.", e);
-        } finally {
-          if (txStore != null) {
-            txStore.unreserve(Duration.ofMillis(state.deferTime));
+          } catch (Exception e) {
+            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          } finally {
+            if (txStore != null) {
+              txStore.unreserve(Duration.ofMillis(state.deferTime));
+            }
           }
         }
+      } finally {
+        log.trace("A TransactionRunner is exiting...");
+        Preconditions.checkState(exited.compareAndSet(false, true));
+        Preconditions.checkState(runningTxRunners.remove(this));

Review Comment:
   changed to hashset



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -198,55 +210,62 @@ private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        FateTxStore<T> txStore = null;
-        ExecutionState state = new ExecutionState();
-        try {
-          var optionalopStore = reserveFateTx();
-          if (optionalopStore.isPresent()) {
-            txStore = optionalopStore.orElseThrow();
-          } else {
-            continue;
-          }
-          state.status = txStore.getStatus();
-          state.op = txStore.top();
-          if (state.status == FAILED_IN_PROGRESS) {
-            processFailed(txStore, state.op);
-          } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) 
{
-            try {
-              execute(txStore, state);
-              if (state.op != null && state.deferTime != 0) {
-                // The current op is not ready to execute
-                continue;
-              }
-            } catch (StackOverflowException e) {
-              // the op that failed to push onto the stack was never executed, 
so no need to undo
-              // it just transition to failed and undo the ops that executed
-              transitionToFailed(txStore, e);
-              continue;
-            } catch (Exception e) {
-              blockIfHadoopShutdown(txStore.getID(), e);
-              transitionToFailed(txStore, e);
+      runningTxRunners.add(this);
+      try {
+        while (keepRunning.get() && !stop.get()) {
+          FateTxStore<T> txStore = null;
+          ExecutionState state = new ExecutionState();
+          try {
+            var optionalopStore = reserveFateTx();
+            if (optionalopStore.isPresent()) {
+              txStore = optionalopStore.orElseThrow();
+            } else {
               continue;
             }
+            state.status = txStore.getStatus();
+            state.op = txStore.top();
+            if (state.status == FAILED_IN_PROGRESS) {
+              processFailed(txStore, state.op);
+            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+              try {
+                execute(txStore, state);
+                if (state.op != null && state.deferTime != 0) {
+                  // The current op is not ready to execute
+                  continue;
+                }
+              } catch (StackOverflowException e) {
+                // the op that failed to push onto the stack was never 
executed, so no need to undo
+                // it just transition to failed and undo the ops that executed
+                transitionToFailed(txStore, e);
+                continue;
+              } catch (Exception e) {
+                blockIfHadoopShutdown(txStore.getID(), e);
+                transitionToFailed(txStore, e);
+                continue;
+              }
 
-            if (state.op == null) {
-              // transaction is finished
-              String ret = state.prevOp.getReturn();
-              if (ret != null) {
-                txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+              if (state.op == null) {
+                // transaction is finished
+                String ret = state.prevOp.getReturn();
+                if (ret != null) {
+                  txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+                }
+                txStore.setStatus(SUCCESSFUL);
+                doCleanUp(txStore);
               }
-              txStore.setStatus(SUCCESSFUL);
-              doCleanUp(txStore);
             }
-          }
-        } catch (Exception e) {
-          runnerLog.error("Uncaught exception in FATE runner thread.", e);
-        } finally {
-          if (txStore != null) {
-            txStore.unreserve(Duration.ofMillis(state.deferTime));
+          } catch (Exception e) {
+            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          } finally {
+            if (txStore != null) {
+              txStore.unreserve(Duration.ofMillis(state.deferTime));
+            }
           }
         }
+      } finally {
+        log.trace("A TransactionRunner is exiting...");
+        Preconditions.checkState(exited.compareAndSet(false, true));

Review Comment:
   good catch. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to