This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f73af60 Rename CompactionTask to InternalJob (#2270) f73af60 is described below commit f73af60b86e452ef9cd429b56ccc53e93c602eda Author: Mike Miller <mmil...@apache.org> AuthorDate: Thu Sep 16 11:50:25 2021 -0400 Rename CompactionTask to InternalJob (#2270) * Rename CompactionTask to InternalJob to be consistent with ExternalJob * Rename variables to match InternalJob type * Fix some wording in comments * Add javadoc comments --- .../tserver/compactions/CompactionExecutor.java | 6 +++ .../compactions/ExternalCompactionExecutor.java | 35 ++++++------ .../compactions/InternalCompactionExecutor.java | 63 +++++++++++----------- .../accumulo/tserver/compactions/SubmittedJob.java | 3 ++ 4 files changed, 61 insertions(+), 46 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java index 85581a8..0e6e823 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java @@ -24,6 +24,12 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +/** + * A non-pluggable component that executes compactions using multiple threads and has a priority + * queue. There are two types: Internal and External. The {@link InternalCompactionExecutor} runs + * within the Accumulo tserver process. The {@link ExternalCompactionExecutor} runs compactions + * outside the tserver. + */ public interface CompactionExecutor { SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable compactable, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java index 7d2dcd5..7acf20f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java @@ -41,13 +41,16 @@ import org.apache.accumulo.tserver.compactions.SubmittedJob.Status; import com.google.common.base.Preconditions; +/** + * Runs compactions outside the tserver, typically by a process external to Accumulo. + */ public class ExternalCompactionExecutor implements CompactionExecutor { - // This exist to provide an accurate count of queued compactions for metrics. The PriorityQueue is + // This set provides an accurate count of queued compactions for metrics. The PriorityQueue is // not used because its size may be off due to it containing cancelled compactions. The collection // below should not contain cancelled compactions. A concurrent set was not used because those do // not have constant time size operations. - private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new HashSet<>()); + private final Set<ExternalJob> queuedJob = Collections.synchronizedSet(new HashSet<>()); private class ExternalJob extends SubmittedJob { private final AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED); @@ -61,7 +64,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor { super(job); this.compactable = compactable; this.csid = csid; - queuedTask.add(this); + queuedJob.add(this); this.timeCreated = System.currentTimeMillis(); } @@ -83,11 +86,11 @@ public class ExternalCompactionExecutor implements CompactionExecutor { if (expectedStatus == Status.QUEUED) { canceled = status.compareAndSet(expectedStatus, Status.CANCELED); if (canceled) { - queuedTask.remove(this); + queuedJob.remove(this); } if (canceled && cancelCount.incrementAndGet() % 1024 == 0) { - // Occasionally clean the queue of canceled tasks that have hung around because of their + // Occasionally clean the queue of canceled jobs that have hung around because of their // low priority. This runs periodically, instead of every time something is canceled, to // avoid hurting performance. queue.removeIf(ej -> ej.getStatus() == Status.CANCELED); @@ -106,8 +109,8 @@ public class ExternalCompactionExecutor implements CompactionExecutor { } } - private PriorityBlockingQueue<ExternalJob> queue; - private CompactionExecutorId ceid; + private final PriorityBlockingQueue<ExternalJob> queue; + private final CompactionExecutorId ceid; public ExternalCompactionExecutor(CompactionExecutorId ceid) { this.ceid = ceid; @@ -116,8 +119,8 @@ public class ExternalCompactionExecutor implements CompactionExecutor { priorityComparator = priorityComparator.reversed().thenComparingLong(ExternalJob::getTimeCreated); - this.queue = new PriorityBlockingQueue<ExternalJob>(100, - priorityComparator.thenComparing(priorityComparator)); + this.queue = + new PriorityBlockingQueue<>(100, priorityComparator.thenComparing(priorityComparator)); } @Override @@ -140,7 +143,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor { public int getCompactionsQueued(CType ctype) { if (ctype != CType.EXTERNAL) return 0; - return queuedTask.size(); + return queuedJob.size(); } @Override @@ -166,7 +169,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor { if (extJob.getJob().getPriority() >= priority) { if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) { - queuedTask.remove(extJob); + queuedJob.remove(extJob); var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(), compactorId, externalCompactionId); if (ecj == null) { @@ -191,7 +194,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor { public Stream<TCompactionQueueSummary> summarize() { HashSet<Short> uniqPrios = new HashSet<Short>(); - queuedTask.forEach(task -> uniqPrios.add(task.getJob().getPriority())); + queuedJob.forEach(job -> uniqPrios.add(job.getJob().getPriority())); Stream<Short> prioStream = uniqPrios.stream(); @@ -212,13 +215,13 @@ public class ExternalCompactionExecutor implements CompactionExecutor { @Override public void compactableClosed(KeyExtent extent) { - List<ExternalJob> taskToCancel; - synchronized (queuedTask) { - taskToCancel = queuedTask.stream().filter(ejob -> ejob.getExtent().equals(extent)) + List<ExternalJob> jobToCancel; + synchronized (queuedJob) { + jobToCancel = queuedJob.stream().filter(ejob -> ejob.getExtent().equals(extent)) .collect(Collectors.toList()); } - taskToCancel.forEach(task -> task.cancel(Status.QUEUED)); + jobToCancel.forEach(job -> job.cancel(Status.QUEUED)); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index cc7b66d..0ff836b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -47,6 +47,9 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +/** + * Runs compactions within the tserver. + */ public class InternalCompactionExecutor implements CompactionExecutor { private static final Logger log = LoggerFactory.getLogger(InternalCompactionExecutor.class); @@ -56,32 +59,32 @@ public class InternalCompactionExecutor implements CompactionExecutor { private AtomicLong cancelCount = new AtomicLong(); private ThreadPoolExecutor threadPool; - // This exist to provide an accurate count of queued compactions for metrics. The PriorityQueue is + // This set provides an accurate count of queued compactions for metrics. The PriorityQueue is // not used because its size may be off due to it containing cancelled compactions. The collection // below should not contain cancelled compactions. A concurrent set was not used because those do // not have constant time size operations. - private Set<CompactionTask> queuedTask = Collections.synchronizedSet(new HashSet<>()); + private final Set<InternalJob> queuedJob = Collections.synchronizedSet(new HashSet<>()); - private AutoCloseable metricCloser; + private final AutoCloseable metricCloser; - private RateLimiter readLimiter; - private RateLimiter writeLimiter; + private final RateLimiter readLimiter; + private final RateLimiter writeLimiter; - private class CompactionTask extends SubmittedJob implements Runnable { + private class InternalJob extends SubmittedJob implements Runnable { - private AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED); - private Compactable compactable; - private CompactionServiceId csid; - private Consumer<Compactable> completionCallback; + private final AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED); + private final Compactable compactable; + private final CompactionServiceId csid; + private final Consumer<Compactable> completionCallback; private final long queuedTime; - public CompactionTask(CompactionJob job, Compactable compactable, CompactionServiceId csid, + public InternalJob(CompactionJob job, Compactable compactable, CompactionServiceId csid, Consumer<Compactable> completionCallback) { super(job); this.compactable = compactable; this.csid = csid; this.completionCallback = completionCallback; - queuedTask.add(this); + queuedJob.add(this); queuedTime = System.currentTimeMillis(); } @@ -90,7 +93,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { try { if (status.compareAndSet(Status.QUEUED, Status.RUNNING)) { - queuedTask.remove(this); + queuedJob.remove(this); compactable.compact(csid, getJob(), readLimiter, writeLimiter, queuedTime); completionCallback.accept(compactable); } @@ -117,24 +120,24 @@ public class InternalCompactionExecutor implements CompactionExecutor { } if (canceled) - queuedTask.remove(this); + queuedJob.remove(this); if (canceled && cancelCount.incrementAndGet() % 1024 == 0) { - // Occasionally clean the queue of canceled tasks that have hung around because of their low + // Occasionally clean the queue of canceled jobs that have hung around because of their low // priority. This runs periodically, instead of every time something is canceled, to avoid // hurting performance. queue.removeIf(runnable -> { - CompactionTask compactionTask; + InternalJob internalJob; if (runnable instanceof TraceRunnable) { runnable = ((TraceRunnable) runnable).getRunnable(); } - if (runnable instanceof CompactionTask) { - compactionTask = (CompactionTask) runnable; + if (runnable instanceof InternalJob) { + internalJob = (InternalJob) runnable; } else { throw new IllegalArgumentException( "Unknown runnable type " + runnable.getClass().getName()); } - return compactionTask.getStatus() == Status.CANCELED; + return internalJob.getStatus() == Status.CANCELED; }); } @@ -151,8 +154,8 @@ public class InternalCompactionExecutor implements CompactionExecutor { return getJob(((TraceRunnable) r).getRunnable()); } - if (r instanceof CompactionTask) { - return ((CompactionTask) r).getJob(); + if (r instanceof InternalJob) { + return ((InternalJob) r).getJob(); } throw new IllegalArgumentException("Unknown runnable type " + r.getClass().getName()); @@ -170,7 +173,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { "compaction." + ceid, queue, OptionalInt.empty(), true); metricCloser = - ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedTask.size()); + ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedJob.size()); this.readLimiter = readLimiter; this.writeLimiter = writeLimiter; @@ -182,9 +185,9 @@ public class InternalCompactionExecutor implements CompactionExecutor { public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable compactable, Consumer<Compactable> completionCallback) { Preconditions.checkArgument(job.getExecutor().equals(ceid)); - var ctask = new CompactionTask(job, compactable, csid, completionCallback); - threadPool.execute(ctask); - return ctask; + var internalJob = new InternalJob(job, compactable, csid, completionCallback); + threadPool.execute(internalJob); + return internalJob; } public void setThreads(int numThreads) { @@ -216,7 +219,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { public int getCompactionsQueued(CType ctype) { if (ctype != CType.INTERNAL) return 0; - return queuedTask.size(); + return queuedJob.size(); } @Override @@ -232,12 +235,12 @@ public class InternalCompactionExecutor implements CompactionExecutor { @Override public void compactableClosed(KeyExtent extent) { - List<CompactionTask> taskToCancel; - synchronized (queuedTask) { - taskToCancel = queuedTask.stream().filter(ejob -> ejob.getExtent().equals(extent)) + List<InternalJob> jobToCancel; + synchronized (queuedJob) { + jobToCancel = queuedJob.stream().filter(job -> job.getExtent().equals(extent)) .collect(Collectors.toList()); } - taskToCancel.forEach(task -> task.cancel(Status.QUEUED)); + jobToCancel.forEach(job -> job.cancel(Status.QUEUED)); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java index 1335e34..657f8f0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java @@ -20,6 +20,9 @@ package org.apache.accumulo.tserver.compactions; import org.apache.accumulo.core.spi.compaction.CompactionJob; +/** + * A submitted Compaction job, either internal or external. + */ public abstract class SubmittedJob { private final CompactionJob job;