This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new ff67c09 Split compact method into two more specific methods (#2263) ff67c09 is described below commit ff67c0948507db573a313da5a71f690a6a92fff9 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Sep 10 06:42:24 2021 -0400 Split compact method into two more specific methods (#2263) * Split compact method in CompactionService into to 2 methods: getCompactionPlan() and submitCompactionJob() * Rename compact() to submitCompaction() * Add javadoc comments to various methods * Add final to various private members * Drop redundant enum check and boolean assignment Co-authored-by: Keith Turner <ktur...@apache.org> --- .../client/admin/compaction/CompactableFile.java | 2 ++ .../core/spi/compaction/CompactionPlan.java | 3 ++ .../core/util/compaction/CompactionPlanImpl.java | 8 ++--- .../tserver/compactions/CompactionManager.java | 11 ++++--- .../tserver/compactions/CompactionService.java | 36 ++++++++++++++-------- .../accumulo/tserver/tablet/CompactableImpl.java | 7 ++--- 6 files changed, 42 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java index 3e0a21e..8be4985 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java @@ -23,6 +23,8 @@ import java.net.URI; import org.apache.accumulo.core.metadata.CompactableFileImpl; /** + * A single file ready to compact, that will come in a set of possible candidates. + * * @since 2.1.0 */ public interface CompactableFile { diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java index 592f5b4..410dccd 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java @@ -55,5 +55,8 @@ public interface CompactionPlan { CompactionPlan build(); } + /** + * Return the set of jobs this plan will submit for compaction. + */ Collection<CompactionJob> getJobs(); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java index 111c386..e204cb8 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java @@ -54,11 +54,11 @@ public class CompactionPlanImpl implements CompactionPlan { public static class BuilderImpl implements CompactionPlan.Builder { - private CompactionKind kind; + private final CompactionKind kind; private ArrayList<CompactionJob> jobs = new ArrayList<>(); - private Set<CompactableFile> allFiles; - private Set<CompactableFile> seenFiles = new HashSet<>(); - private Set<CompactableFile> candidates; + private final Set<CompactableFile> allFiles; + private final Set<CompactableFile> seenFiles = new HashSet<>(); + private final Set<CompactableFile> candidates; public BuilderImpl(CompactionKind kind, Set<CompactableFile> allFiles, Set<CompactableFile> candidates) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 5e65f7a..f7c5b2a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -254,7 +254,7 @@ public class CompactionManager { new HashSet<>(runningExternalCompactions.keySet()); for (Compactable compactable : compactables) { last = compactable; - compact(compactable); + submitCompaction(compactable); // remove anything from snapshot that tablets know are running compactable.getExternalCompactionIds(runningEcids::remove); } @@ -267,7 +267,7 @@ public class CompactionManager { compactablesToCheck.poll(maxTimeBetweenChecks - passed, TimeUnit.MILLISECONDS); if (compactable != null) { last = compactable; - compact(compactable); + submitCompaction(compactable); } } @@ -290,7 +290,10 @@ public class CompactionManager { } } - private void compact(Compactable compactable) { + /** + * Get each configured service for the compactable tablet and submit for compaction + */ + private void submitCompaction(Compactable compactable) { for (CompactionKind ctype : CompactionKind.values()) { var csid = compactable.getConfiguredService(ctype); var service = services.get(csid); @@ -308,7 +311,7 @@ public class CompactionManager { } if (service != null) { - service.compact(ctype, compactable, compactablesToCheck::add); + service.submitCompaction(ctype, compactable, compactablesToCheck::add); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index ef19cbf..43f1363 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -223,15 +224,27 @@ public class CompactionService { return true; } - public void compact(CompactionKind kind, Compactable compactable, + /** + * Get compaction plan for the provided compactable tablet and possibly submit for compaction. + * Plans get added to the planning queue before calling the planningExecutor to get the plan. If + * no files are selected, return. Otherwise, submit the compaction job. + */ + public void submitCompaction(CompactionKind kind, Compactable compactable, Consumer<Compactable> completionCallback) { Objects.requireNonNull(compactable); + // add tablet to planning queue and use planningExecutor to get the plan if (queuedForPlanning.get(kind).putIfAbsent(compactable.getExtent(), compactable) == null) { try { planningExecutor.execute(() -> { try { - planCompaction(kind, compactable, completionCallback); + Optional<Compactable.Files> files = compactable.getFiles(myId, kind); + if (files.isEmpty() || files.get().candidates.isEmpty()) { + log.trace("Compactable returned no files {} {}", compactable.getExtent(), kind); + } else { + CompactionPlan plan = getCompactionPlan(kind, files.get(), compactable); + submitCompactionJob(plan, files.get(), compactable, completionCallback); + } } finally { queuedForPlanning.get(kind).remove(compactable.getExtent()); } @@ -305,16 +318,9 @@ public class CompactionService { } } - private void planCompaction(CompactionKind kind, Compactable compactable, - Consumer<Compactable> completionCallback) { - var files = compactable.getFiles(myId, kind); - - if (files.isEmpty() || files.get().candidates.isEmpty()) { - log.trace("Compactable returned no files {} {} {}", compactable.getExtent(), kind, files); - return; - } - - PlanningParameters params = new CpPlanParams(kind, compactable, files.get()); + private CompactionPlan getCompactionPlan(CompactionKind kind, Compactable.Files files, + Compactable compactable) { + PlanningParameters params = new CpPlanParams(kind, compactable, files); log.trace("Planning compactions {} {} {} {}", planner.getClass().getName(), compactable.getExtent(), kind, files); @@ -328,7 +334,11 @@ public class CompactionService { throw e; } - plan = convertPlan(plan, kind, files.get().allFiles, files.get().candidates); + return convertPlan(plan, kind, files.allFiles, files.candidates); + } + + private void submitCompactionJob(CompactionPlan plan, Compactable.Files files, + Compactable compactable, Consumer<Compactable> completionCallback) { // log error if tablet is metadata and compaction is external var execIds = plan.getJobs().stream().map(cj -> (CompactionExecutorIdImpl) cj.getExecutor()); if (compactable.getExtent().isMeta() && execIds.anyMatch(ceid -> ceid.isExternalId())) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 5ab8fbf..e64d53a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -680,7 +680,7 @@ public class CompactableImpl implements Compactable { private void checkifChopComplete(Set<StoredTabletFile> allFiles) { - boolean completed = false; + boolean completed; synchronized (this) { completed = fileMgr.finishChop(allFiles); @@ -847,7 +847,7 @@ public class CompactableImpl implements Compactable { } Pair<Long,CompactionConfig> idAndCfg = null; - if (extKind != null && extKind == CompactionKind.USER) { + if (extKind == CompactionKind.USER) { try { idAndCfg = tablet.getCompactionID(); if (!idAndCfg.getFirst().equals(cid)) { @@ -870,7 +870,6 @@ public class CompactableImpl implements Compactable { } if (extKind != null) { - if (extKind == CompactionKind.USER) { this.chelper = CompactableUtils.getHelper(extKind, tablet, cid, idAndCfg.getSecond()); this.compactionConfig = idAndCfg.getSecond(); @@ -1012,7 +1011,7 @@ public class CompactableImpl implements Compactable { } class CompactionCheck { - private Supplier<Boolean> memoizedCheck; + private final Supplier<Boolean> memoizedCheck; public CompactionCheck(CompactionServiceId service, CompactionKind kind, Long compactionId) { this.memoizedCheck = Suppliers.memoizeWithExpiration(() -> {