dlmarion commented on code in PR #4042:
URL: https://github.com/apache/accumulo/pull/4042#discussion_r1420554503


##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -68,25 +74,128 @@ public class Fate<T> {
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
 
   private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+  private final BlockingQueue<Long> workQueue;
+  private final SignalCount idleWorkerCount = new SignalCount();

Review Comment:
   I wonder if the LinkedTransferQueue would give you the semantics that you 
are looking for in a single object. You could use `hasWaitingConsumer` or 
`getWaitingConsumerCount` to do what you are doing with `SignalCount`. It's 
unbounded, but you could limit the amount of items you put into the 
TransferQueue yourself.



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -68,25 +74,128 @@ public class Fate<T> {
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
 
   private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+  private final BlockingQueue<Long> workQueue;
+  private final SignalCount idleWorkerCount = new SignalCount();
+  private final Thread workFinder;
 
   public enum TxInfo {
     TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
   }
 
+  private class SignalCount {
+    long count;
+
+    synchronized void increment() {
+      count++;
+      this.notifyAll();
+    }
+
+    synchronized void decrement() {
+      Preconditions.checkState(count > 0);
+      count--;
+      this.notifyAll();
+    }
+
+    synchronized void waitTillNonZero() {
+      while (count == 0 && keepRunning.get()) {
+        try {
+          wait(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * A single thread that finds transactions to work on and queues them up. Do 
not want each worker
+   * thread going to the store and looking for work as it would place more 
load on the store.
+   */
+  private class WorkFinder implements Runnable {
+
+    @Override
+    public void run() {
+
+      try {
+
+        while (keepRunning.get()) {
+
+          while (!workQueue.isEmpty() && keepRunning.get()) {
+            // wait till there is at least one worker that is looking for work 
and the queue is
+            // empty
+            idleWorkerCount.waitTillNonZero();
+          }

Review Comment:
   If `workQueue` was a LinkedTransferQueue, then I think this could become:
   ```
     while (!workQueue.hasWaitingConsumer() && keepRunning.get()) {
       Thread.onSpinWait();
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to