This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 955c44a fixed #2019 repopulate selected set for ext compactions w/ other improvements 955c44a is described below commit 955c44a7e409b34f25d9fb5c9786e907083502fa Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Apr 15 20:58:37 2021 -0400 fixed #2019 repopulate selected set for ext compactions w/ other improvements * repopulated set of selected files for external compaction * added lots of sanity checks to existing external compactions on tablet load * added IT to compact 200 tablets externally, test is timing out --- .../schema/ExternalCompactionMetadata.java | 48 ++++- .../coordinator/DeadCompactionDetector.java | 1 + .../accumulo/tserver/tablet/CompactableImpl.java | 231 ++++++++++++++++++--- .../accumulo/tserver/tablet/CompactableUtils.java | 10 +- .../accumulo/tserver/tablet/DatafileManager.java | 16 +- .../apache/accumulo/test/ExternalCompactionIT.java | 64 +++++- 6 files changed, 324 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java index d1d127c..d64a9a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java @@ -39,29 +39,42 @@ public class ExternalCompactionMetadata { private static final Gson GSON = new GsonBuilder().create(); private final Set<StoredTabletFile> jobFiles; + private final Set<StoredTabletFile> nextFiles; private final TabletFile compactTmpName; private final TabletFile newFile; private final String compactorId; private final CompactionKind kind; private final long priority; private final CompactionExecutorId ceid; - - public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, TabletFile compactTmpName, - TabletFile newFile, String compactorId, CompactionKind kind, long priority, - CompactionExecutorId ceid) { + private final boolean propogateDeletes; + private final boolean selectedAll; + private final Long compactionId; + + public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, Set<StoredTabletFile> nextFiles, + TabletFile compactTmpName, TabletFile newFile, String compactorId, CompactionKind kind, + long priority, CompactionExecutorId ceid, boolean propogateDeletes, boolean selectedAll, + Long compactionId) { this.jobFiles = Objects.requireNonNull(jobFiles); + this.nextFiles = Objects.requireNonNull(nextFiles); this.compactTmpName = Objects.requireNonNull(compactTmpName); this.newFile = Objects.requireNonNull(newFile); this.compactorId = Objects.requireNonNull(compactorId); this.kind = Objects.requireNonNull(kind); this.priority = priority; this.ceid = Objects.requireNonNull(ceid); + this.propogateDeletes = propogateDeletes; + this.selectedAll = selectedAll; + this.compactionId = compactionId; } public Set<StoredTabletFile> getJobFiles() { return jobFiles; } + public Set<StoredTabletFile> getNextFiles() { + return nextFiles; + } + public TabletFile getCompactTmpName() { return compactTmpName; } @@ -86,37 +99,62 @@ public class ExternalCompactionMetadata { return ceid; } + public boolean isPropogateDeletes() { + return propogateDeletes; + } + + public boolean isSelectedAll() { + return selectedAll; + } + + public Long getCompactionId() { + return compactionId; + } + // This class is used to serialize and deserialize this class using GSon. Any changes to this // class must consider persisted data. private static class GSonData { List<String> inputs; + List<String> nextFiles; String tmp; String dest; String compactor; String kind; String executorId; long priority; + boolean propDels; + boolean selectedAll; + Long compactionId; } public String toJson() { GSonData jData = new GSonData(); + jData.inputs = jobFiles.stream().map(StoredTabletFile::getMetaUpdateDelete).collect(toList()); + jData.nextFiles = + nextFiles.stream().map(StoredTabletFile::getMetaUpdateDelete).collect(toList()); jData.tmp = compactTmpName.getMetaInsert(); jData.dest = newFile.getMetaInsert(); jData.compactor = compactorId; jData.kind = kind.name(); jData.executorId = ceid.getExernalName(); jData.priority = priority; + jData.propDels = propogateDeletes; + jData.selectedAll = selectedAll; + jData.compactionId = compactionId; return GSON.toJson(jData); } public static ExternalCompactionMetadata fromJson(String json) { GSonData jData = GSON.fromJson(json, GSonData.class); + return new ExternalCompactionMetadata( jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()), + jData.nextFiles.stream().map(StoredTabletFile::new).collect(toSet()), new TabletFile(new Path(jData.tmp)), new TabletFile(new Path(jData.dest)), jData.compactor, CompactionKind.valueOf(jData.kind), jData.priority, - CompactionExecutorId.externalId(jData.executorId)); + CompactionExecutorId.externalId(jData.executorId), jData.propDels, jData.selectedAll, + jData.compactionId); } @Override diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index 1feb7f5..61e27c6 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -125,6 +125,7 @@ public class DeadCompactionDetector { danglingEcids.forEach( ecid -> log.debug("Detected dangling external compaction final state marker {}", ecid)); + // todo add logging in impl context.getAmple().deleteExternalCompactionFinalStates(danglingEcids); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 177358c..f74a060 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -22,16 +22,20 @@ import static org.apache.accumulo.tserver.TabletStatsKeeper.Operation.MAJOR; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Supplier; @@ -58,6 +62,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.spi.compaction.CompactionServices; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServiceEnvironmentImpl; @@ -69,6 +74,7 @@ import org.apache.accumulo.tserver.compactions.CompactionManager; import org.apache.accumulo.tserver.compactions.ExternalCompactionJob; import org.apache.accumulo.tserver.managermessage.TabletStatusMessage; import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,23 +157,42 @@ public class CompactableImpl implements Compactable { var dataFileSizes = tablet.getDatafileManager().getDatafileSizes(); + Map<ExternalCompactionId,String> extCompactionsToRemove = new HashMap<>(); + + initializeSelection(extCompactions, tablet, extCompactionsToRemove); + + sanityCheckExternalCompactions(extCompactions, dataFileSizes.keySet(), extCompactionsToRemove); + + extCompactionsToRemove.forEach((ecid, reason) -> { + log.warn("Removing external compaction {} for {} because {} meta: {}", ecid, + tablet.getExtent(), reason, extCompactions.get(ecid).toJson()); + }); + + if (!extCompactionsToRemove.isEmpty()) { + var tabletMutator = tablet.getContext().getAmple().mutateTablet(tablet.getExtent()); + extCompactionsToRemove.keySet().forEach(tabletMutator::deleteExternalCompaction); + tabletMutator.mutate(); + } + extCompactions.forEach((ecid, ecMeta) -> { - // CBUG ensure files for each external compaction are disjoint - allCompactingFiles.addAll(ecMeta.getJobFiles()); - Collection<CompactableFile> files = ecMeta.getJobFiles().stream() - .map(f -> new CompactableFileImpl(f, dataFileSizes.get(f))).collect(Collectors.toList()); - CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), - ecMeta.getCompactionExecutorId(), files, ecMeta.getKind()); - runnningJobs.add(job); + if (!extCompactionsToRemove.containsKey(ecid)) { + allCompactingFiles.addAll(ecMeta.getJobFiles()); + Collection<CompactableFile> files = + ecMeta.getJobFiles().stream().map(f -> new CompactableFileImpl(f, dataFileSizes.get(f))) + .collect(Collectors.toList()); + CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), + ecMeta.getCompactionExecutorId(), files, ecMeta.getKind()); + runnningJobs.add(job); - ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - ecInfo.job = job; - ecInfo.meta = ecMeta; - externalCompactions.put(ecid, ecInfo); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.job = job; + ecInfo.meta = ecMeta; + externalCompactions.put(ecid, ecInfo); - log.debug("Loaded tablet {} has existing external compaction {} {}", getExtent(), ecid, - ecMeta); - manager.registerExternalCompaction(ecid, getExtent()); + log.debug("Loaded tablet {} has existing external compaction {} {}", getExtent(), ecid, + ecMeta); + manager.registerExternalCompaction(ecid, getExtent()); + } }); compactionRunning = !allCompactingFiles.isEmpty(); @@ -181,6 +206,29 @@ public class CompactableImpl implements Compactable { }, 2, TimeUnit.SECONDS); } + private void sanityCheckExternalCompactions( + Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions, + Set<StoredTabletFile> tabletFiles, Map<ExternalCompactionId,String> extCompactionsToRemove) { + + Set<StoredTabletFile> seen = new HashSet<>(); + AtomicBoolean overlap = new AtomicBoolean(false); + + extCompactions.forEach((ecid, ecMeta) -> { + if (!tabletFiles.containsAll(ecMeta.getJobFiles())) { + extCompactionsToRemove.putIfAbsent(ecid, "Has files outside of tablet files"); + } else if (!Collections.disjoint(seen, ecMeta.getJobFiles())) { + overlap.set(true); + } + }); + + if (overlap.get()) { + extCompactions.keySet().forEach(ecid -> { + extCompactionsToRemove.putIfAbsent(ecid, "Some external compaction files overlap"); + }); + } + + } + void initiateChop() { Set<StoredTabletFile> allFiles = tablet.getDatafiles().keySet(); @@ -321,6 +369,131 @@ public class CompactableImpl implements Compactable { } } + private void initializeSelection( + Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions, Tablet tablet, + Map<ExternalCompactionId,String> externalCompactionsToRemove) { + CompactionKind extKind = null; + boolean unexpectedExternal = false; + Set<StoredTabletFile> tmpSelectedFiles = null; + Boolean selAll = null; + Long cid = null; + Boolean propDel = null; + int count = 0; + + ArrayList<String> reasons = new ArrayList<>(); + + for (Entry<ExternalCompactionId,ExternalCompactionMetadata> entry : extCompactions.entrySet()) { + var ecMeta = entry.getValue(); + + // CBUG what about chop? + if (ecMeta.getKind() != CompactionKind.USER && ecMeta.getKind() != CompactionKind.SELECTOR) { + continue; + } + + count++; + + if (extKind == null || extKind == ecMeta.getKind()) { + extKind = ecMeta.getKind(); + } else { + reasons.add("Saw USER and SELECTOR"); + unexpectedExternal = true; + break; + } + + if (tmpSelectedFiles == null) { + tmpSelectedFiles = Sets.union(ecMeta.getJobFiles(), ecMeta.getNextFiles()); + } else if (!Sets.union(ecMeta.getJobFiles(), ecMeta.getNextFiles()) + .equals(tmpSelectedFiles)) { + reasons.add("Selected set of files differs"); + unexpectedExternal = true; + break; + } + + if (selAll == null) { + selAll = ecMeta.isSelectedAll(); + } else if (selAll != ecMeta.isSelectedAll()) { + unexpectedExternal = true; + reasons.add("Disagreement on selectedAll"); + break; + } + + if (ecMeta.getKind() == CompactionKind.USER) { + if (ecMeta.getCompactionId() == null) { + unexpectedExternal = true; + reasons.add("Missing compactionId"); + break; + } else if (cid == null) { + cid = ecMeta.getCompactionId(); + } else if (!cid.equals(ecMeta.getCompactionId())) { + unexpectedExternal = true; + reasons.add("Disagreement on compactionId"); + break; + } + } else if (ecMeta.getCompactionId() != null) { + unexpectedExternal = true; + reasons.add("Unexpected compactionId"); + break; + } + + if (propDel == null) { + propDel = ecMeta.isPropogateDeletes(); + } else if (propDel != ecMeta.isPropogateDeletes()) { + unexpectedExternal = true; + reasons.add("Disagreement on propogateDeletes"); + break; + } + + } + + if (propDel != null && !propDel && count > 1) { + unexpectedExternal = true; + reasons.add("Concurrent compactions not propogatingDeletes"); + } + + Pair<Long,CompactionConfig> idAndCfg = null; + if (extKind != null && extKind == CompactionKind.USER) { + try { + idAndCfg = tablet.getCompactionID(); + if (!idAndCfg.getFirst().equals(cid)) { + unexpectedExternal = true; + reasons.add("Compaction id mismatch with zookeeper"); + } + } catch (NoNodeException e) { + unexpectedExternal = true; + reasons.add("No compaction id in zookeeper"); + } + } + + if (unexpectedExternal) { + String reason = reasons.toString(); + extCompactions.entrySet().stream().filter(e -> { + var kind = e.getValue().getKind(); + return kind == CompactionKind.SELECTOR || kind == CompactionKind.USER; + }).map(Entry::getKey).forEach(ecid -> externalCompactionsToRemove.putIfAbsent(ecid, reason)); + return; + } + + if (extKind != null) { + + if (extKind == CompactionKind.USER) { + this.chelper = CompactableUtils.getHelper(extKind, tablet, cid, idAndCfg.getSecond()); + this.compactionConfig = idAndCfg.getSecond(); + this.compactionId = cid; + } else if (extKind == CompactionKind.SELECTOR) { + this.chelper = CompactableUtils.getHelper(extKind, tablet, null, null); + } + + this.selectedFiles.clear(); + this.selectedFiles.addAll(tmpSelectedFiles); + this.selectKind = extKind; + this.selectedAll = selAll; + this.selectStatus = SpecialStatus.SELECTED; + + log.debug("Selected compaction status initialized from external compactions {} {} {} {}", + getExtent(), selectStatus, selectedAll, asFileNames(selectedFiles)); + } + } + private void initiateSelection(CompactionKind kind, Long compactionId, CompactionConfig compactionConfig) { Preconditions.checkArgument(kind == CompactionKind.USER || kind == CompactionKind.SELECTOR); @@ -585,12 +758,13 @@ public class CompactableImpl implements Compactable { private static class CompactionInfo { Set<StoredTabletFile> jobFiles; - Long compactionId = null; Long checkCompactionId = null; boolean propogateDeletes = true; CompactionHelper localHelper; List<IteratorSetting> iters = List.of(); CompactionConfig localCompactionCfg; + boolean selectedAll; + Set<StoredTabletFile> selectedFiles; } private CompactionInfo reserveFilesForCompaction(CompactionServiceId service, CompactionJob job) { @@ -667,6 +841,10 @@ public class CompactableImpl implements Compactable { && cInfo.jobFiles.containsAll(selectedFiles)) { cInfo.propogateDeletes = false; } + + cInfo.selectedFiles = Set.copyOf(selectedFiles); + cInfo.selectedAll = selectedAll; + break; default: if (((CompactionJobImpl) job).selectedAll()) { @@ -674,11 +852,9 @@ public class CompactableImpl implements Compactable { // dropped. cInfo.propogateDeletes = false; } - } - if (job.getKind() == CompactionKind.USER && selectKind == job.getKind() - && selectedFiles.equals(cInfo.jobFiles)) { - cInfo.compactionId = this.compactionId; + // CBUG what about chop? + cInfo.selectedFiles = Set.of(); } if (job.getKind() == CompactionKind.USER) { @@ -735,8 +911,8 @@ public class CompactableImpl implements Compactable { TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg); tablet.incrementStatusMajor(); - metaFile = CompactableUtils.compact(tablet, job, cInfo.jobFiles, cInfo.compactionId, - cInfo.propogateDeletes, cInfo.localHelper, cInfo.iters, + metaFile = CompactableUtils.compact(tablet, job, cInfo.jobFiles, cInfo.checkCompactionId, + cInfo.selectedFiles, cInfo.propogateDeletes, cInfo.localHelper, cInfo.iters, new CompactionCheck(service, job.getKind(), cInfo.checkCompactionId), readLimiter, writeLimiter, stats); @@ -771,8 +947,10 @@ public class CompactableImpl implements Compactable { ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles, compactTmpName, newFile, - compactorId, job.getKind(), job.getPriority(), job.getExecutor()); + ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles, + Sets.difference(cInfo.selectedFiles, cInfo.jobFiles), compactTmpName, newFile, + compactorId, job.getKind(), job.getPriority(), job.getExecutor(), cInfo.propogateDeletes, + cInfo.selectedAll, cInfo.checkCompactionId); tablet.getContext().getAmple().mutateTablet(getExtent()) .putExternalCompaction(externalCompactionId, ecInfo.meta).mutate(); @@ -780,8 +958,6 @@ public class CompactableImpl implements Compactable { externalCompactions.put(externalCompactionId, ecInfo); - // CBUG because this is an RPC the return may never get to the caller... however the caller - // may be alive.... maybe the caller can set the externalCompactionId it working on in ZK return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, compactTmpName, getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters); @@ -814,9 +990,12 @@ public class CompactableImpl implements Compactable { // TODO do a sanity check that files exists in dfs? StoredTabletFile metaFile = null; try { + // possibly do some sanity checks here metaFile = tablet.getDatafileManager().bringMajorCompactionOnline( ecInfo.meta.getJobFiles(), ecInfo.meta.getCompactTmpName(), ecInfo.meta.getNewFile(), - compactionId, new DataFileValue(fileSize, entries), Optional.of(extCompactionId)); + ecInfo.meta.getCompactionId(), + Sets.union(ecInfo.meta.getJobFiles(), ecInfo.meta.getNextFiles()), + new DataFileValue(fileSize, entries), Optional.of(extCompactionId)); TabletLogger.compacted(getExtent(), ecInfo.job, metaFile); } catch (Exception e) { metaFile = null; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index f5b0b5e..45cf61e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -536,10 +536,10 @@ public class CompactableUtils { } static StoredTabletFile compact(Tablet tablet, CompactionJob job, Set<StoredTabletFile> jobFiles, - Long compactionId, boolean propogateDeletes, CompactableImpl.CompactionHelper helper, - List<IteratorSetting> iters, CompactionCheck compactionCheck, RateLimiter readLimiter, - RateLimiter writeLimiter, CompactionStats stats) - throws IOException, CompactionCanceledException { + Long compactionId, Set<StoredTabletFile> selectedFiles, boolean propogateDeletes, + CompactableImpl.CompactionHelper helper, List<IteratorSetting> iters, + CompactionCheck compactionCheck, RateLimiter readLimiter, RateLimiter writeLimiter, + CompactionStats stats) throws IOException, CompactionCanceledException { StoredTabletFile metaFile; CompactionEnv cenv = new CompactionEnv() { @Override @@ -615,7 +615,7 @@ public class CompactableUtils { stats.add(mcs); metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(compactFiles.keySet(), - compactTmpName, newFile, compactionId, + compactTmpName, newFile, compactionId, selectedFiles, new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()), Optional.empty()); return metaFile; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 56483bc..b0fb60d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -398,8 +398,9 @@ class DatafileManager { } StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles, - TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, DataFileValue dfv, - Optional<ExternalCompactionId> ecid) throws IOException { + TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, + Set<StoredTabletFile> selectedFiles, DataFileValue dfv, Optional<ExternalCompactionId> ecid) + throws IOException { final KeyExtent extent = tablet.getExtent(); VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager(); long t1, t2; @@ -420,6 +421,9 @@ class DatafileManager { TServerInstance lastLocation = null; // calling insert to get the new file before inserting into the metadata StoredTabletFile newFile = newDatafile.insert(); + + Long compactionIdToWrite = null; + synchronized (tablet) { t1 = System.currentTimeMillis(); @@ -445,6 +449,10 @@ class DatafileManager { lastLocation = tablet.resetLastLocation(); + if (compactionId != null && Collections.disjoint(selectedFiles, datafileSizes.keySet())) { + compactionIdToWrite = compactionId; + } + t2 = System.currentTimeMillis(); } @@ -453,10 +461,10 @@ class DatafileManager { if (!filesInUseByScans.isEmpty()) log.debug("Adding scan refs to metadata {} {}", extent, filesInUseByScans); ManagerMetadataUtil.replaceDatafiles(tablet.getContext(), extent, oldDatafiles, - filesInUseByScans, newFile, compactionId, dfv, + filesInUseByScans, newFile, compactionIdToWrite, dfv, tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock(), ecid); - tablet.setLastCompactionID(compactionId); + tablet.setLastCompactionID(compactionIdToWrite); removeFilesAfterScan(filesInUseByScans); if (log.isTraceEnabled()) { diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 9ce8ea9..fc61228 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -60,7 +60,6 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; -import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; @@ -81,6 +80,12 @@ public class ExternalCompactionIT extends ConfigurableMacBase { private static final Logger LOG = LoggerFactory.getLogger(ExternalCompactionIT.class); + private static final int MAX_DATA = 1000; + + private static String row(int r) { + return String.format("r:%04d", r); + } + @Override protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setProperty("tserver.compaction.major.service.cs1.planner", @@ -151,7 +156,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { verify(client, table1, 2); SortedSet<Text> splits = new TreeSet<>(); - splits.add(new Text("r:4")); + splits.add(new Text(row(MAX_DATA / 2))); client.tableOperations().addSplits(table2, splits); compact(client, table2, 3, "DCQ2", true); @@ -160,8 +165,44 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } } + // test times out, need to improve how a single tablet w/ lots of compactions is handled... + // currently waits 1 min between each compaction per tserver + @Ignore @Test - //@Ignore // waiting for solution to issue #2019 + public void testManytablets() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String table1 = "ectt4"; + + SortedSet<Text> splits = new TreeSet<>(); + int jump = MAX_DATA / 200; + + for (int r = jump; r < MAX_DATA; r += jump) { + splits.add(new Text(row(r))); + } + + createTable(client, table1, "cs1", splits); + + writeData(client, table1); + + cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(CompactionCoordinator.class); + + compact(client, table1, 3, "DCQ1", true); + + verify(client, table1, 3); + } + } + + // CBUG add test that configures output file for external compaction + + // CBUG add test that verifies iterators configured on table (not on user compaction) are used in + // external compaction + + @Test + // @Ignore // waiting for solution to issue #2019 public void testExternalCompactionDeadTServer() throws Exception { // Shut down the normal TServers getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> { @@ -254,7 +295,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } int expectedCount = 0; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < MAX_DATA; i++) { if (i % modulus == 0) expectedCount++; } @@ -286,11 +327,22 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } + private void createTable(AccumuloClient client, String tableName, String service, + SortedSet<Text> splits) throws Exception { + Map<String,String> props = + Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", service); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props).withSplits(splits); + + client.tableOperations().create(tableName, ntc); + + } + private void writeData(AccumuloClient client, String table1) throws MutationsRejectedException, TableNotFoundException, AccumuloException, AccumuloSecurityException { try (BatchWriter bw = client.createBatchWriter(table1)) { - for (int i = 0; i < 10; i++) { - Mutation m = new Mutation("r:" + i); + for (int i = 0; i < MAX_DATA; i++) { + Mutation m = new Mutation(row(i)); m.put("", "", "" + i); bw.addMutation(m); }