Refactor how we track live size Patch by marcuse; reviewed by yukim for CASSANDRA-7852
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5160c916 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5160c916 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5160c916 Branch: refs/heads/trunk Commit: 5160c916c90886f69023ddba0078a624e5cf202d Parents: 9c316e7 Author: Marcus Eriksson <marc...@apache.org> Authored: Fri Oct 17 14:15:46 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Nov 3 16:39:19 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/DataTracker.java | 109 ++++++++++++------- .../db/compaction/CompactionManager.java | 26 ++--- .../cassandra/db/compaction/CompactionTask.java | 7 +- .../cassandra/db/compaction/Scrubber.java | 12 +- .../cassandra/db/compaction/Upgrader.java | 31 +++--- .../io/sstable/IndexSummaryManager.java | 2 +- .../cassandra/io/sstable/SSTableRewriter.java | 90 ++++----------- .../db/compaction/AntiCompactionTest.java | 48 +++++++- .../io/sstable/IndexSummaryManagerTest.java | 2 +- .../cassandra/io/sstable/SSTableReaderTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 57 ++++++---- 12 files changed, 219 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 681d616..32083cc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.2 + * Refactor how we track live size (CASSANDRA-7852) * Make sure unfinished compaction files are removed (CASSANDRA-8124) * Fix shutdown when run as Windows service (CASSANDRA-8136) * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 7393323..7df2b75 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -254,33 +254,36 @@ public class DataTracker public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) { - replace(sstables, Collections.<SSTableReader>emptyList()); + removeSSTablesFromTracker(sstables); + releaseReferences(sstables, false); notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType); } + /** + * + * @param oldSSTables + * @param allReplacements + * @param compactionType + */ // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call - public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType) + public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType) { - replace(sstables, Collections.<SSTableReader>emptyList()); - notifySSTablesChanged(sstables, allReplacements, compactionType); - for (SSTableReader sstable : allReplacements) - { - long bytesOnDisk = sstable.bytesOnDisk(); - cfstore.metric.totalDiskSpaceUsed.inc(bytesOnDisk); - cfstore.metric.liveDiskSpaceUsed.inc(bytesOnDisk); - } + removeSSTablesFromTracker(oldSSTables); + releaseReferences(oldSSTables, false); + notifySSTablesChanged(oldSSTables, allReplacements, compactionType); + addNewSSTablesSize(allReplacements); } public void addInitialSSTables(Collection<SSTableReader> sstables) { - replace(Collections.<SSTableReader>emptyList(), sstables); + addSSTablesToTracker(sstables); // no notifications or backup necessary } public void addSSTables(Collection<SSTableReader> sstables) { - replace(Collections.<SSTableReader>emptyList(), sstables); + addSSTablesToTracker(sstables); for (SSTableReader sstable : sstables) { maybeIncrementallyBackup(sstable); @@ -289,6 +292,32 @@ public class DataTracker } /** + * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance + * + * @param toReplace + * @param replaceWith + */ + public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith) + { + replaceReaders(toReplace, replaceWith, true); + } + + /** + * Adds the early opened files to the data tracker, but does not tell compaction strategies about it + * + * note that we dont track the live size of these sstables + * @param toReplace + * @param replaceWith + */ + public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith) + { + for (SSTableReader s : toReplace) + assert s.openReason.equals(SSTableReader.OpenReason.EARLY); + // note that we can replace an early opened file with a real one + replaceReaders(toReplace, replaceWith, false); + } + + /** * removes all sstables that are not busy compacting. */ public void unreferenceSSTables() @@ -310,7 +339,8 @@ public class DataTracker return; } notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN); - postReplace(notCompacting, Collections.<SSTableReader>emptySet(), true); + removeOldSSTablesSize(notCompacting); + releaseReferences(notCompacting, true); } /** @@ -344,11 +374,11 @@ public class DataTracker void init() { view.set(new View( - ImmutableList.of(new Memtable(cfstore)), - ImmutableList.<Memtable>of(), - Collections.<SSTableReader>emptySet(), - Collections.<SSTableReader>emptySet(), - SSTableIntervalTree.empty())); + ImmutableList.of(new Memtable(cfstore)), + ImmutableList.<Memtable>of(), + Collections.<SSTableReader>emptySet(), + Collections.<SSTableReader>emptySet(), + SSTableIntervalTree.empty())); } /** @@ -358,7 +388,7 @@ public class DataTracker * @param oldSSTables replaced readers * @param newSSTables replacement readers */ - public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify) + private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify) { View currentView, newView; do @@ -369,7 +399,7 @@ public class DataTracker while (!view.compareAndSet(currentView, newView)); if (!oldSSTables.isEmpty() && notify) - notifySSTablesChanged(oldSSTables, newSSTables, OperationType.COMPACTION); + notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN); for (SSTableReader sstable : newSSTables) sstable.setTrackedBy(this); @@ -378,29 +408,28 @@ public class DataTracker sstable.releaseReference(); } - private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) + private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables) { - if (!cfstore.isValid()) - { - removeOldSSTablesSize(replacements, false); - replacements = Collections.emptyList(); - } - View currentView, newView; do { currentView = view.get(); - newView = currentView.replace(oldSSTables, replacements); + newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList()); } while (!view.compareAndSet(currentView, newView)); - - postReplace(oldSSTables, replacements, false); + removeOldSSTablesSize(oldSSTables); } - private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements, boolean tolerateCompacted) + private void addSSTablesToTracker(Collection<SSTableReader> sstables) { - addNewSSTablesSize(replacements); - removeOldSSTablesSize(oldSSTables, tolerateCompacted); + View currentView, newView; + do + { + currentView = view.get(); + newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables); + } + while (!view.compareAndSet(currentView, newView)); + addNewSSTablesSize(sstables); } private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables) @@ -418,7 +447,7 @@ public class DataTracker } } - private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted) + private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables) { for (SSTableReader sstable : oldSSTables) { @@ -428,13 +457,15 @@ public class DataTracker long size = sstable.bytesOnDisk(); StorageMetrics.load.dec(size); cfstore.metric.liveDiskSpaceUsed.dec(size); + } + } - // tolerateCompacted will be true when the CFS is no longer valid (dropped). If there were ongoing - // compactions when it was invalidated, sstables may already be marked compacted, so we should - // tolerate that (see CASSANDRA-5957) + private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted) + { + for (SSTableReader sstable : oldSSTables) + { boolean firstToCompact = sstable.markObsolete(); - assert (tolerateCompacted || firstToCompact) : sstable + " was already marked compacted"; - + assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted"; sstable.releaseReference(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 51f45b8..84c3cb5 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -688,8 +688,9 @@ public class CompactionManager implements CompactionManagerMBean CleanupInfo ci = new CleanupInfo(sstable, scanner); metrics.beginCompaction(ci); - SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false); - + Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); + SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false); + List<SSTableReader> finished; try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs))) { writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); @@ -711,7 +712,8 @@ public class CompactionManager implements CompactionManagerMBean // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd cfs.indexManager.flushIndexesBlocking(); - writer.finish(); + finished = writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP); } catch (Throwable e) { @@ -724,17 +726,16 @@ public class CompactionManager implements CompactionManagerMBean metrics.finishCompaction(ci); } - List<SSTableReader> results = writer.finished(); - if (!results.isEmpty()) + if (!finished.isEmpty()) { String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms."; long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); long startsize = sstable.onDiskLength(); long endsize = 0; - for (SSTableReader newSstable : results) + for (SSTableReader newSstable : finished) endsize += newSstable.onDiskLength(); double ratio = (double) endsize / (double) startsize; - logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); + logger.info(String.format(format, finished.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); } } @@ -994,8 +995,8 @@ public class CompactionManager implements CompactionManagerMBean sstableAsSet.add(sstable); File destination = cfs.directories.getDirectoryForNewSSTables(); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false); + SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); + SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable))); @@ -1024,11 +1025,10 @@ public class CompactionManager implements CompactionManagerMBean } // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness - repairedSSTableWriter.finish(false, repairedAt); - unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE); // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt - anticompactedSSTables.addAll(repairedSSTableWriter.finished()); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); + anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); + anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index d2ae04a..b442482 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -152,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask { AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller); Iterator<AbstractCompactedRow> iter = ci.iterator(); - + List<SSTableReader> newSStables; // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to // replace the old entries. Track entries to preheat here until then. long minRepairedAt = getMinRepairedAt(actuallyCompact); @@ -161,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask if (collector != null) collector.beginCompaction(ci); long lastCheckObsoletion = start; - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline); + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline); try { if (!iter.hasNext()) @@ -197,7 +197,7 @@ public class CompactionTask extends AbstractCompactionTask } // don't replace old sstables yet, as we need to mark the compaction finished in the system table - writer.finish(false); + newSStables = writer.finish(); } catch (Throwable t) { @@ -217,7 +217,6 @@ public class CompactionTask extends AbstractCompactionTask } Collection<SSTableReader> oldSStables = this.sstables; - List<SSTableReader> newSStables = writer.finished(); if (!offline) cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index b3d098d..0cd71f2 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -22,6 +22,7 @@ import java.io.*; import java.util.*; import com.google.common.base.Throwables; +import com.google.common.collect.Sets; import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.*; @@ -107,7 +108,8 @@ public class Scrubber implements Closeable public void scrub() { outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); - SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline); + Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); + SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline); try { ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); @@ -256,9 +258,11 @@ public class Scrubber implements Closeable } // finish obsoletes the old sstable - writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt); - if (!writer.finished().isEmpty()) - newSstable = writer.finished().get(0); + List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt); + if (!finished.isEmpty()) + newSstable = finished.get(0); + if (!isOffline) + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index f102fef..39f668d 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.*; import com.google.common.base.Throwables; +import com.google.common.collect.Sets; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -35,7 +36,6 @@ public class Upgrader { private final ColumnFamilyStore cfs; private final SSTableReader sstable; - private final Set<SSTableReader> toUpgrade; private final File directory; private final OperationType compactionType = OperationType.UPGRADE_SSTABLES; @@ -49,7 +49,6 @@ public class Upgrader { this.cfs = cfs; this.sstable = sstable; - this.toUpgrade = new HashSet<>(Collections.singleton(sstable)); this.outputHandler = outputHandler; this.directory = new File(sstable.getFilename()).getParentFile(); @@ -57,8 +56,8 @@ public class Upgrader this.controller = new UpgradeController(cfs); this.strategy = cfs.getCompactionStrategy(); - long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade)); - long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes()); + long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable))); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes()); this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } @@ -68,27 +67,22 @@ public class Upgrader // Get the max timestamp of the precompacted sstables // and adds generation of live ancestors - // -- note that we always only have one SSTable in toUpgrade here: - for (SSTableReader sstable : toUpgrade) + sstableMetadataCollector.addAncestor(sstable.descriptor.generation); + for (Integer i : sstable.getAncestors()) { - sstableMetadataCollector.addAncestor(sstable.descriptor.generation); - for (Integer i : sstable.getAncestors()) - { - if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) - sstableMetadataCollector.addAncestor(i); - } - sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); + if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) + sstableMetadataCollector.addAncestor(i); } - + sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector); } public void upgrade() { outputHandler.output("Upgrading " + sstable); - - SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true); - try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade)) + Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable); + SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true); + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade)) { Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator(); writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); @@ -98,7 +92,8 @@ public class Upgrader writer.append(row); } - writer.finish(); + List<SSTableReader> sstables = writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(toUpgrade, sstables, OperationType.UPGRADE_SSTABLES); outputHandler.output("Upgrade of " + sstable + " complete."); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index cc60b4d..65b25a4 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -416,7 +416,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean for (DataTracker tracker : replacedByTracker.keySet()) { - tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker), true); + tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker)); newSSTables.addAll(replacementsByTracker.get(tracker)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 2c9fe7e..4d5a06f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -18,7 +18,6 @@ package org.apache.cassandra.io.sstable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,7 +35,6 @@ import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -67,8 +65,6 @@ public class SSTableRewriter preemptiveOpenInterval = interval; } - private boolean isFinished = false; - @VisibleForTesting static void overrideOpenInterval(long size) { @@ -86,16 +82,14 @@ public class SSTableRewriter private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at - private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>(); - private final OperationType rewriteType; // the type of rewrite/compaction being performed private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker) private SSTableWriter writer; private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>(); - public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline) + public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline) { this.rewriting = rewriting; for (SSTableReader sstable : rewriting) @@ -106,7 +100,6 @@ public class SSTableRewriter this.dataTracker = cfs.getDataTracker(); this.cfs = cfs; this.maxAge = maxAge; - this.rewriteType = rewriteType; this.isOffline = isOffline; } @@ -147,28 +140,18 @@ public class SSTableRewriter // attempts to append the row, if fails resets the writer position public RowIndexEntry tryAppend(AbstractCompactedRow row) { - mark(); + writer.mark(); try { return append(row); } catch (Throwable t) { - resetAndTruncate(); + writer.resetAndTruncate(); throw t; } } - private void mark() - { - writer.mark(); - } - - private void resetAndTruncate() - { - writer.resetAndTruncate(); - } - private void maybeReopenEarly(DecoratedKey key) { if (FBUtilities.isUnix() && writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval) @@ -186,7 +169,7 @@ public class SSTableRewriter SSTableReader reader = writer.openEarly(maxAge); if (reader != null) { - replaceReader(currentlyOpenedEarly, reader, false); + replaceEarlyOpenedFile(currentlyOpenedEarly, reader); currentlyOpenedEarly = reader; currentlyOpenedEarlyAt = writer.getFilePointer(); moveStarts(reader, Functions.constant(reader.last), false); @@ -222,7 +205,7 @@ public class SSTableRewriter // releases reference in replaceReaders if (!isOffline) { - dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false); + dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList()); dataTracker.unmarkCompacting(close); } } @@ -276,12 +259,14 @@ public class SSTableRewriter })); } } - replaceReaders(toReplace, replaceWith, true); + cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith); rewriting.removeAll(toReplace); rewriting.addAll(replaceWith); } - private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith, boolean notify) + + + private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith) { if (isOffline) return; @@ -296,14 +281,7 @@ public class SSTableRewriter dataTracker.markCompacting(Collections.singleton(replaceWith)); toReplaceSet = Collections.emptySet(); } - replaceReaders(toReplaceSet, Collections.singleton(replaceWith), notify); - } - - private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith, boolean notify) - { - if (isOffline) - return; - dataTracker.replaceReaders(toReplace, replaceWith, notify); + dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith)); } public void switchWriter(SSTableWriter newWriter) @@ -318,7 +296,7 @@ public class SSTableRewriter if (reader != null) { finishedOpenedEarly.add(reader); - replaceReader(currentlyOpenedEarly, reader, false); + replaceEarlyOpenedFile(currentlyOpenedEarly, reader); moveStarts(reader, Functions.constant(reader.last), false); } finishedWriters.add(Pair.create(writer, reader)); @@ -327,38 +305,34 @@ public class SSTableRewriter writer = newWriter; } - public void finish() - { - finish(-1); - } - public void finish(long repairedAt) - { - finish(true, repairedAt); - } - public void finish(boolean cleanupOldReaders) + public List<SSTableReader> finish() { - finish(cleanupOldReaders, -1); + return finish(-1); } /** * Finishes the new file(s) * - * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the - * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics - * (ie, call DataTracker#markCompactedSSTablesReplaced(..)) + * Creates final files, adds the new files to the dataTracker (via replaceReader). + * + * We add them to the tracker to be able to get rid of the tmpfiles + * + * It is up to the caller to do the compacted sstables replacement + * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..)) + * * - * @param cleanupOldReaders if we should replace the old files with the new ones * @param repairedAt the repair time, -1 if we should use the time we supplied when we created * the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the * repair time. */ - public void finish(boolean cleanupOldReaders, long repairedAt) + public List<SSTableReader> finish(long repairedAt) { + List<SSTableReader> finished = new ArrayList<>(); if (writer.getFilePointer() > 0) { SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt); finished.add(reader); - replaceReader(currentlyOpenedEarly, reader, false); + replaceEarlyOpenedFile(currentlyOpenedEarly, reader); moveStarts(reader, Functions.constant(reader.last), false); } else @@ -373,7 +347,7 @@ public class SSTableRewriter SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt); finished.add(newReader); // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. - replaceReader(w.right, newReader, false); + replaceEarlyOpenedFile(w.right, newReader); } else { @@ -384,23 +358,7 @@ public class SSTableRewriter if (!isOffline) { dataTracker.unmarkCompacting(finished); - if (cleanupOldReaders) - dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType); } - else if (cleanupOldReaders) - { - for (SSTableReader reader : rewriting) - { - reader.markObsolete(); - reader.releaseReference(); - } - } - isFinished = true; - } - - public List<SSTableReader> finished() - { - assert isFinished; return finished; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 6e1ac5f..5ed4f4a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; @@ -41,6 +43,9 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.junit.After; import org.junit.Test; @@ -89,7 +94,48 @@ public class AntiCompactionTest extends SchemaLoader assertEquals(repairedKeys, 4); assertEquals(nonRepairedKeys, 6); } - + @Test + public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.disableAutoCompaction(); + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + long origSize = s.bytesOnDisk(); + System.out.println(cfs.metric.liveDiskSpaceUsed.count()); + Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500))); + Collection<SSTableReader> sstables = cfs.getSSTables(); + SSTableReader.acquireReferences(sstables); + CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345); + long sum = 0; + for (SSTableReader x : cfs.getSSTables()) + sum += x.bytesOnDisk(); + assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count()); + assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000); + + } + + private SSTableReader writeFile(ColumnFamilyStore cfs, int count) + { + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); + for (int i = 0; i < count; i++) + cf.addColumn(Util.column(String.valueOf(i), "a", 1)); + File dir = cfs.directories.getDirectoryForNewSSTables(); + String filename = cfs.getTempSSTablePath(dir); + + SSTableWriter writer = new SSTableWriter(filename, + 0, + 0, + cfs.metadata, + StorageService.getPartitioner(), + new MetadataCollector(cfs.metadata.comparator)); + + for (int i = 0; i < count * 5; i++) + writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); + return writer.closeAndOpenReader(); + } + @Test public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index b621c45..0a2b5a6 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -428,7 +428,7 @@ public class IndexSummaryManagerTest extends SchemaLoader } // don't leave replaced SSTRs around to break other tests - cfs.getDataTracker().replaceReaders(Collections.singleton(original), Collections.singleton(sstable), true); + cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 7f85019..6f8ab62 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -412,7 +412,7 @@ public class SSTableReaderTest extends SchemaLoader } SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1); - store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement), true); + store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement)); for (Future future : futures) future.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 8b203ac..4d248bd 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import com.google.common.collect.Sets; import org.junit.Test; @@ -40,6 +41,7 @@ import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; @@ -66,7 +68,7 @@ public class SSTableRewriterTest extends SchemaLoader cfs.forceBlockingFlush(); Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false); + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); ICompactionScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); @@ -76,7 +78,7 @@ public class SSTableRewriterTest extends SchemaLoader AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); writer.append(row); } - writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION); validateCFS(cfs); @@ -142,7 +144,7 @@ public class SSTableRewriterTest extends SchemaLoader @Test - public void testNumberOfFiles() throws InterruptedException + public void testNumberOfFilesAndSizes() throws InterruptedException { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -150,10 +152,10 @@ public class SSTableRewriterTest extends SchemaLoader SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); - + long startStorageMetricsLoad = StorageMetrics.load.count(); Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -167,13 +169,23 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count()); + assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count()); + } } - rewriter.finish(); - assertEquals(files, rewriter.finished().size()); + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + long sum = 0; + for (SSTableReader x : cfs.getSSTables()) + sum += x.bytesOnDisk(); + assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count()); + assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count()); + assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); Thread.sleep(1000); // tmplink and tmp files should be gone: + assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count()); assertFileCounts(s.descriptor.directory.list(), 0, 0); validateCFS(cfs); } @@ -190,7 +202,7 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -206,10 +218,10 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } } - rewriter.finish(false); - assertEquals(files, rewriter.finished().size()); + List<SSTableReader> sstables = rewriter.finish(); + assertEquals(files, sstables.size()); assertEquals(files + 1, cfs.getSSTables().size()); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); assertEquals(files, cfs.getSSTables().size()); Thread.sleep(1000); assertFileCounts(s.descriptor.directory.list(), 0, 0); @@ -226,11 +238,12 @@ public class SSTableRewriterTest extends SchemaLoader SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); + long startSize = cfs.metric.liveDiskSpaceUsed.count(); DecoratedKey origFirst = s.first; DecoratedKey origLast = s.last; Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -248,6 +261,7 @@ public class SSTableRewriterTest extends SchemaLoader } rewriter.abort(); Thread.sleep(1000); + assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count()); assertEquals(1, cfs.getSSTables().size()); assertFileCounts(s.descriptor.directory.list(), 0, 0); assertEquals(cfs.getSSTables().iterator().next().first, origFirst); @@ -270,7 +284,7 @@ public class SSTableRewriterTest extends SchemaLoader DecoratedKey origLast = s.last; Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -313,7 +327,7 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -331,7 +345,8 @@ public class SSTableRewriterTest extends SchemaLoader if (files == 3) { //testing to finish when we have nothing written in the new file - rewriter.finish(); + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); break; } } @@ -353,7 +368,7 @@ public class SSTableRewriterTest extends SchemaLoader cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -369,7 +384,8 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } } - rewriter.finish(); + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); Thread.sleep(1000); assertFileCounts(s.descriptor.directory.list(), 0, 0); cfs.truncateBlocking(); @@ -389,7 +405,7 @@ public class SSTableRewriterTest extends SchemaLoader cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(1000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); ICompactionScanner scanner = s.getScanner(); @@ -406,8 +422,9 @@ public class SSTableRewriterTest extends SchemaLoader files++; } } - rewriter.finish(); - assertEquals(files, rewriter.finished().size()); + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); Thread.sleep(1000); assertFileCounts(s.descriptor.directory.list(), 0, 0);