This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f73af60  Rename CompactionTask to InternalJob (#2270)
f73af60 is described below

commit f73af60b86e452ef9cd429b56ccc53e93c602eda
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Thu Sep 16 11:50:25 2021 -0400

    Rename CompactionTask to InternalJob (#2270)
    
    * Rename CompactionTask to InternalJob to be consistent with ExternalJob
    * Rename variables to match InternalJob type
    * Fix some wording in comments
    * Add javadoc comments
---
 .../tserver/compactions/CompactionExecutor.java    |  6 +++
 .../compactions/ExternalCompactionExecutor.java    | 35 ++++++------
 .../compactions/InternalCompactionExecutor.java    | 63 +++++++++++-----------
 .../accumulo/tserver/compactions/SubmittedJob.java |  3 ++
 4 files changed, 61 insertions(+), 46 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
index 85581a8..0e6e823 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
@@ -24,6 +24,12 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 
+/**
+ * A non-pluggable component that executes compactions using multiple threads 
and has a priority
+ * queue. There are two types: Internal and External. The {@link 
InternalCompactionExecutor} runs
+ * within the Accumulo tserver process. The {@link ExternalCompactionExecutor} 
runs compactions
+ * outside the tserver.
+ */
 public interface CompactionExecutor {
 
   SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable 
compactable,
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 7d2dcd5..7acf20f 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -41,13 +41,16 @@ import 
org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * Runs compactions outside the tserver, typically by a process external to 
Accumulo.
+ */
 public class ExternalCompactionExecutor implements CompactionExecutor {
 
-  // This exist to provide an accurate count of queued compactions for 
metrics. The PriorityQueue is
+  // This set provides an accurate count of queued compactions for metrics. 
The PriorityQueue is
   // not used because its size may be off due to it containing cancelled 
compactions. The collection
   // below should not contain cancelled compactions. A concurrent set was not 
used because those do
   // not have constant time size operations.
-  private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new 
HashSet<>());
+  private final Set<ExternalJob> queuedJob = Collections.synchronizedSet(new 
HashSet<>());
 
   private class ExternalJob extends SubmittedJob {
     private final AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
@@ -61,7 +64,7 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
       super(job);
       this.compactable = compactable;
       this.csid = csid;
-      queuedTask.add(this);
+      queuedJob.add(this);
       this.timeCreated = System.currentTimeMillis();
     }
 
@@ -83,11 +86,11 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
       if (expectedStatus == Status.QUEUED) {
         canceled = status.compareAndSet(expectedStatus, Status.CANCELED);
         if (canceled) {
-          queuedTask.remove(this);
+          queuedJob.remove(this);
         }
 
         if (canceled && cancelCount.incrementAndGet() % 1024 == 0) {
-          // Occasionally clean the queue of canceled tasks that have hung 
around because of their
+          // Occasionally clean the queue of canceled jobs that have hung 
around because of their
           // low priority. This runs periodically, instead of every time 
something is canceled, to
           // avoid hurting performance.
           queue.removeIf(ej -> ej.getStatus() == Status.CANCELED);
@@ -106,8 +109,8 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
     }
   }
 
-  private PriorityBlockingQueue<ExternalJob> queue;
-  private CompactionExecutorId ceid;
+  private final PriorityBlockingQueue<ExternalJob> queue;
+  private final CompactionExecutorId ceid;
 
   public ExternalCompactionExecutor(CompactionExecutorId ceid) {
     this.ceid = ceid;
@@ -116,8 +119,8 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
     priorityComparator =
         
priorityComparator.reversed().thenComparingLong(ExternalJob::getTimeCreated);
 
-    this.queue = new PriorityBlockingQueue<ExternalJob>(100,
-        priorityComparator.thenComparing(priorityComparator));
+    this.queue =
+        new PriorityBlockingQueue<>(100, 
priorityComparator.thenComparing(priorityComparator));
   }
 
   @Override
@@ -140,7 +143,7 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
   public int getCompactionsQueued(CType ctype) {
     if (ctype != CType.EXTERNAL)
       return 0;
-    return queuedTask.size();
+    return queuedJob.size();
   }
 
   @Override
@@ -166,7 +169,7 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
 
       if (extJob.getJob().getPriority() >= priority) {
         if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
-          queuedTask.remove(extJob);
+          queuedJob.remove(extJob);
           var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, 
extJob.getJob(),
               compactorId, externalCompactionId);
           if (ecj == null) {
@@ -191,7 +194,7 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
 
   public Stream<TCompactionQueueSummary> summarize() {
     HashSet<Short> uniqPrios = new HashSet<Short>();
-    queuedTask.forEach(task -> uniqPrios.add(task.getJob().getPriority()));
+    queuedJob.forEach(job -> uniqPrios.add(job.getJob().getPriority()));
 
     Stream<Short> prioStream = uniqPrios.stream();
 
@@ -212,13 +215,13 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
 
   @Override
   public void compactableClosed(KeyExtent extent) {
-    List<ExternalJob> taskToCancel;
-    synchronized (queuedTask) {
-      taskToCancel = queuedTask.stream().filter(ejob -> 
ejob.getExtent().equals(extent))
+    List<ExternalJob> jobToCancel;
+    synchronized (queuedJob) {
+      jobToCancel = queuedJob.stream().filter(ejob -> 
ejob.getExtent().equals(extent))
           .collect(Collectors.toList());
     }
 
-    taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+    jobToCancel.forEach(job -> job.cancel(Status.QUEUED));
   }
 
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index cc7b66d..0ff836b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -47,6 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * Runs compactions within the tserver.
+ */
 public class InternalCompactionExecutor implements CompactionExecutor {
 
   private static final Logger log = 
LoggerFactory.getLogger(InternalCompactionExecutor.class);
@@ -56,32 +59,32 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
   private AtomicLong cancelCount = new AtomicLong();
   private ThreadPoolExecutor threadPool;
 
-  // This exist to provide an accurate count of queued compactions for 
metrics. The PriorityQueue is
+  // This set provides an accurate count of queued compactions for metrics. 
The PriorityQueue is
   // not used because its size may be off due to it containing cancelled 
compactions. The collection
   // below should not contain cancelled compactions. A concurrent set was not 
used because those do
   // not have constant time size operations.
-  private Set<CompactionTask> queuedTask = Collections.synchronizedSet(new 
HashSet<>());
+  private final Set<InternalJob> queuedJob = Collections.synchronizedSet(new 
HashSet<>());
 
-  private AutoCloseable metricCloser;
+  private final AutoCloseable metricCloser;
 
-  private RateLimiter readLimiter;
-  private RateLimiter writeLimiter;
+  private final RateLimiter readLimiter;
+  private final RateLimiter writeLimiter;
 
-  private class CompactionTask extends SubmittedJob implements Runnable {
+  private class InternalJob extends SubmittedJob implements Runnable {
 
-    private AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
-    private Compactable compactable;
-    private CompactionServiceId csid;
-    private Consumer<Compactable> completionCallback;
+    private final AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
+    private final Compactable compactable;
+    private final CompactionServiceId csid;
+    private final Consumer<Compactable> completionCallback;
     private final long queuedTime;
 
-    public CompactionTask(CompactionJob job, Compactable compactable, 
CompactionServiceId csid,
+    public InternalJob(CompactionJob job, Compactable compactable, 
CompactionServiceId csid,
         Consumer<Compactable> completionCallback) {
       super(job);
       this.compactable = compactable;
       this.csid = csid;
       this.completionCallback = completionCallback;
-      queuedTask.add(this);
+      queuedJob.add(this);
       queuedTime = System.currentTimeMillis();
     }
 
@@ -90,7 +93,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
 
       try {
         if (status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
-          queuedTask.remove(this);
+          queuedJob.remove(this);
           compactable.compact(csid, getJob(), readLimiter, writeLimiter, 
queuedTime);
           completionCallback.accept(compactable);
         }
@@ -117,24 +120,24 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
       }
 
       if (canceled)
-        queuedTask.remove(this);
+        queuedJob.remove(this);
 
       if (canceled && cancelCount.incrementAndGet() % 1024 == 0) {
-        // Occasionally clean the queue of canceled tasks that have hung 
around because of their low
+        // Occasionally clean the queue of canceled jobs that have hung around 
because of their low
         // priority. This runs periodically, instead of every time something 
is canceled, to avoid
         // hurting performance.
         queue.removeIf(runnable -> {
-          CompactionTask compactionTask;
+          InternalJob internalJob;
           if (runnable instanceof TraceRunnable) {
             runnable = ((TraceRunnable) runnable).getRunnable();
           }
-          if (runnable instanceof CompactionTask) {
-            compactionTask = (CompactionTask) runnable;
+          if (runnable instanceof InternalJob) {
+            internalJob = (InternalJob) runnable;
           } else {
             throw new IllegalArgumentException(
                 "Unknown runnable type " + runnable.getClass().getName());
           }
-          return compactionTask.getStatus() == Status.CANCELED;
+          return internalJob.getStatus() == Status.CANCELED;
         });
       }
 
@@ -151,8 +154,8 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
       return getJob(((TraceRunnable) r).getRunnable());
     }
 
-    if (r instanceof CompactionTask) {
-      return ((CompactionTask) r).getJob();
+    if (r instanceof InternalJob) {
+      return ((InternalJob) r).getJob();
     }
 
     throw new IllegalArgumentException("Unknown runnable type " + 
r.getClass().getName());
@@ -170,7 +173,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
         "compaction." + ceid, queue, OptionalInt.empty(), true);
 
     metricCloser =
-        ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> 
queuedTask.size());
+        ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> 
queuedJob.size());
 
     this.readLimiter = readLimiter;
     this.writeLimiter = writeLimiter;
@@ -182,9 +185,9 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
   public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, 
Compactable compactable,
       Consumer<Compactable> completionCallback) {
     Preconditions.checkArgument(job.getExecutor().equals(ceid));
-    var ctask = new CompactionTask(job, compactable, csid, completionCallback);
-    threadPool.execute(ctask);
-    return ctask;
+    var internalJob = new InternalJob(job, compactable, csid, 
completionCallback);
+    threadPool.execute(internalJob);
+    return internalJob;
   }
 
   public void setThreads(int numThreads) {
@@ -216,7 +219,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
   public int getCompactionsQueued(CType ctype) {
     if (ctype != CType.INTERNAL)
       return 0;
-    return queuedTask.size();
+    return queuedJob.size();
   }
 
   @Override
@@ -232,12 +235,12 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
 
   @Override
   public void compactableClosed(KeyExtent extent) {
-    List<CompactionTask> taskToCancel;
-    synchronized (queuedTask) {
-      taskToCancel = queuedTask.stream().filter(ejob -> 
ejob.getExtent().equals(extent))
+    List<InternalJob> jobToCancel;
+    synchronized (queuedJob) {
+      jobToCancel = queuedJob.stream().filter(job -> 
job.getExtent().equals(extent))
           .collect(Collectors.toList());
     }
 
-    taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+    jobToCancel.forEach(job -> job.cancel(Status.QUEUED));
   }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
index 1335e34..657f8f0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
@@ -20,6 +20,9 @@ package org.apache.accumulo.tserver.compactions;
 
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 
+/**
+ * A submitted Compaction job, either internal or external.
+ */
 public abstract class SubmittedJob {
   private final CompactionJob job;
 

Reply via email to