cshannon commented on code in PR #4042:
URL: https://github.com/apache/accumulo/pull/4042#discussion_r1420657242
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -68,25 +74,128 @@ public class Fate<T> {
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ private final BlockingQueue<Long> workQueue;
+ private final SignalCount idleWorkerCount = new SignalCount();
+ private final Thread workFinder;
public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
}
+ private class SignalCount {
+ long count;
+
+ synchronized void increment() {
+ count++;
+ this.notifyAll();
+ }
+
+ synchronized void decrement() {
+ Preconditions.checkState(count > 0);
+ count--;
+ this.notifyAll();
+ }
+
+ synchronized void waitTillNonZero() {
+ while (count == 0 && keepRunning.get()) {
+ try {
+ wait(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * A single thread that finds transactions to work on and queues them up. Do
not want each worker
+ * thread going to the store and looking for work as it would place more
load on the store.
+ */
+ private class WorkFinder implements Runnable {
+
+ @Override
+ public void run() {
+
+ try {
+
+ while (keepRunning.get()) {
+
+ while (!workQueue.isEmpty() && keepRunning.get()) {
+ // wait till there is at least one worker that is looking for work
and the queue is
+ // empty
+ idleWorkerCount.waitTillNonZero();
+ }
Review Comment:
I have also not heard of a TransferQueue but it does look like it could be
beneficial here, nice suggestion @dlmarion
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -68,25 +74,128 @@ public class Fate<T> {
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ private final BlockingQueue<Long> workQueue;
+ private final SignalCount idleWorkerCount = new SignalCount();
+ private final Thread workFinder;
public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
}
+ private class SignalCount {
+ long count;
+
+ synchronized void increment() {
+ count++;
+ this.notifyAll();
+ }
+
+ synchronized void decrement() {
+ Preconditions.checkState(count > 0);
+ count--;
+ this.notifyAll();
+ }
+
+ synchronized void waitTillNonZero() {
+ while (count == 0 && keepRunning.get()) {
+ try {
+ wait(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * A single thread that finds transactions to work on and queues them up. Do
not want each worker
+ * thread going to the store and looking for work as it would place more
load on the store.
+ */
+ private class WorkFinder implements Runnable {
+
+ @Override
+ public void run() {
+
+ try {
+
+ while (keepRunning.get()) {
+
+ while (!workQueue.isEmpty() && keepRunning.get()) {
+ // wait till there is at least one worker that is looking for work
and the queue is
+ // empty
+ idleWorkerCount.waitTillNonZero();
+ }
Review Comment:
I have also not used a TransferQueue but it does look like it could be
beneficial here, nice suggestion @dlmarion
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]