Repository: cassandra Updated Branches: refs/heads/trunk 060c7961d -> 37f517593
Anticompact sstables as groups Patch by Russell Spitzer; reviewed by marcuse for CASSANDRA-6851 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37f51759 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37f51759 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37f51759 Branch: refs/heads/trunk Commit: 37f5175935a37ce2c005335c2f486efb827b6eba Parents: 060c796 Author: Russell Spitzer <russell.spit...@gmail.com> Authored: Wed Aug 20 13:27:52 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Aug 20 13:27:52 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 31 ++++ .../db/compaction/CompactionManager.java | 174 +++++++++++++------ .../compaction/LeveledCompactionStrategy.java | 57 +++++- .../db/compaction/AntiCompactionTest.java | 96 +++++++++- .../LeveledCompactionStrategyTest.java | 61 ++++++- 6 files changed, 358 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d1dedbf..80ddddc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Do anticompaction in groups (CASSANDRA-6851) * Verify that UDF class methods are static (CASSANDRA-7781) * Support pure user-defined functions (CASSANDRA-7395) * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 1b7786e..28ab84e 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -399,4 +399,35 @@ public abstract class AbstractCompactionStrategy return optionValue == null || Boolean.parseBoolean(optionValue); } + + + /** + * Method for grouping similar SSTables together, This will be used by + * anti-compaction to determine which SSTables should be anitcompacted + * as a group. If a given compaction strategy creates sstables which + * cannot be merged due to some constraint it must override this method. + */ + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) + { + int groupSize = 2; + List<SSTableReader> sortedSSTablesToGroup = new ArrayList<>(sstablesToGroup); + Collections.sort(sortedSSTablesToGroup, SSTableReader.sstableComparator); + + Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>(); + Collection<SSTableReader> currGroup = new ArrayList<>(); + + for (SSTableReader sstable : sortedSSTablesToGroup) + { + currGroup.add(sstable); + if (currGroup.size() == groupSize) + { + groupedSSTables.add(currGroup); + currGroup = new ArrayList<>(); + } + } + + if (currGroup.size() != 0) + groupedSSTables.add(currGroup); + return groupedSSTables; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index e578ddf..5af7139 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -52,6 +52,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -397,7 +398,7 @@ public class CompactionManager implements CompactionManagerMBean Collection<SSTableReader> validatedForRepair, long repairedAt) throws InterruptedException, ExecutionException, IOException { - logger.info("Starting anticompaction"); + logger.info("Starting anticompaction for {}/{}", cfs.keyspace.getName(), cfs.getColumnFamilyName()); logger.debug("Starting anticompaction for ranges {}", ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); @@ -847,6 +848,37 @@ public class CompactionManager implements CompactionManagerMBean new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel())); } + public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs, + File compactionFileLocation, + int expectedBloomFilterSize, + long repairedAt, + Collection<SSTableReader> sstables) + { + FileUtils.createDirectory(compactionFileLocation); + int minLevel = Integer.MAX_VALUE; + // if all sstables have the same level, we can compact them together without creating overlap during anticompaction + // note that we only anticompact from unrepaired sstables, which is not leveled, but we still keep original level + // after first migration to be able to drop the sstables back in their original place in the repaired sstable manifest + for (SSTableReader sstable : sstables) + { + if (minLevel == Integer.MAX_VALUE) + minLevel = sstable.getSSTableLevel(); + + if (minLevel != sstable.getSSTableLevel()) + { + minLevel = 0; + break; + } + } + return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation), + expectedBloomFilterSize, + repairedAt, + cfs.metadata, + cfs.partitioner, + new MetadataCollector(sstables, cfs.metadata.comparator, minLevel)); + } + + /** * Performs a readonly "compaction" of all sstables in order to validate complete rows, * but without writing the merge result @@ -947,6 +979,8 @@ public class CompactionManager implements CompactionManagerMBean } } + + /** * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted @@ -956,83 +990,111 @@ public class CompactionManager implements CompactionManagerMBean * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field. */ - private Collection<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<SSTableReader> repairedSSTables, long repairedAt) + private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, + Collection<SSTableReader> repairedSSTables, long repairedAt) { - List<SSTableReader> anticompactedSSTables = new ArrayList<>(); - int repairedKeyCount = 0; - int unrepairedKeyCount = 0; // TODO(5351): we can do better here: - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(repairedSSTables))); logger.info("Performing anticompaction on {} sstables", repairedSSTables.size()); + + //Group SSTables + Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repairedSSTables); // iterate over sstables to check if the repaired / unrepaired ranges intersect them. - for (SSTableReader sstable : repairedSSTables) + int antiCompactedSSTableCount = 0; + for (Collection<SSTableReader> sstableGroup : groupedSSTables) { - // check that compaction hasn't stolen any sstables used in previous repair sessions - // if we need to skip the anticompaction, it will be carried out by the next repair + int antiCompacted = antiCompactGroup(cfs, ranges, sstableGroup, repairedAt); + antiCompactedSSTableCount += antiCompacted; + } + + String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; + logger.info(format, repairedSSTables.size(), antiCompactedSSTableCount); + } + + private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, + Collection<SSTableReader> anticompactionGroup, long repairedAt) + { + long groupMaxDataAge = -1; + + // check that compaction hasn't stolen any sstables used in previous repair sessions + // if we need to skip the anticompaction, it will be carried out by the next repair + for (Iterator<SSTableReader> i = anticompactionGroup.iterator(); i.hasNext();) + { + SSTableReader sstable = i.next(); if (!new File(sstable.getFilename()).exists()) { logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable); + i.remove(); continue; } + if (groupMaxDataAge < sstable.maxDataAge) + groupMaxDataAge = sstable.maxDataAge; + } + + if (anticompactionGroup.size() == 0) + { + logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); + return 0; + } - logger.info("Anticompacting {}", sstable); - Set<SSTableReader> sstableAsSet = new HashSet<>(); - sstableAsSet.add(sstable); + logger.info("Anticompacting {}", anticompactionGroup); + Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup); - File destination = cfs.directories.getDirectoryForNewSSTables(); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false); + File destination = cfs.directories.getDirectoryForNewSSTables(); + SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false); + SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false); - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable)); + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + List<ICompactionScanner> scanners = strategy.getScanners(anticompactionGroup); - try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS)) - { - repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable)); + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup))); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller); + long repairedKeyCount = 0; + long unrepairedKeyCount = 0; + try (CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) + { + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); - try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator()) + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller); + + try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator()) + { + while(iter.hasNext()) { - while(iter.hasNext()) + AbstractCompactedRow row = iter.next(); + // if current range from sstable is repaired, save it into the new repaired sstable + if (Range.isInRanges(row.key.getToken(), ranges)) + { + repairedSSTableWriter.append(row); + repairedKeyCount++; + } + // otherwise save into the new 'non-repaired' table + else { - AbstractCompactedRow row = iter.next(); - // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) - { - repairedSSTableWriter.append(row); - repairedKeyCount++; - } - // otherwise save into the new 'non-repaired' table - else - { - unRepairedSSTableWriter.append(row); - unrepairedKeyCount++; - } + unRepairedSSTableWriter.append(row); + unrepairedKeyCount++; } } - // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them - // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness - repairedSSTableWriter.finish(false, repairedAt); - unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE); - // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt - anticompactedSSTables.addAll(repairedSSTableWriter.finished()); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); - } - catch (Throwable e) - { - logger.error("Error anticompacting " + sstable, e); - repairedSSTableWriter.abort(); - unRepairedSSTableWriter.abort(); } + // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them + // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness + repairedSSTableWriter.finish(false, repairedAt); + unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE); + // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt + logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, + repairedKeyCount + unrepairedKeyCount, + cfs.keyspace.getName(), + cfs.getColumnFamilyName(), + anticompactionGroup); + return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size(); } - String format = "Repaired {} keys of {} for {}/{}"; - logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName()); - String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size()); - - return anticompactedSSTables; + catch (Throwable e) + { + logger.error("Error anticompacting " + anticompactionGroup, e); + repairedSSTableWriter.abort(); + unRepairedSSTableWriter.abort(); + } + return 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 3ee59ad..b179b3a 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -18,7 +18,18 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -170,6 +181,50 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem return new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableBytes); } + /** + * Leveled compaction strategy has guarantees on the data contained within each level so we + * have to make sure we only create groups of SSTables with members from the same level. + * This way we won't end up creating invalid sstables during anti-compaction. + * @param ssTablesToGroup + * @return Groups of sstables from the same level + */ + @Override + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> ssTablesToGroup) + { + int groupSize = 2; + Map<Integer, Collection<SSTableReader>> sstablesByLevel = new HashMap<>(); + for (SSTableReader sstable : ssTablesToGroup) + { + Integer level = sstable.getSSTableLevel(); + if (!sstablesByLevel.containsKey(level)) + { + sstablesByLevel.put(level, new ArrayList<SSTableReader>()); + } + sstablesByLevel.get(level).add(sstable); + } + + Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>(); + + for (Collection<SSTableReader> levelOfSSTables : sstablesByLevel.values()) + { + Collection<SSTableReader> currGroup = new ArrayList<>(); + for (SSTableReader sstable : levelOfSSTables) + { + currGroup.add(sstable); + if (currGroup.size() == groupSize) + { + groupedSSTables.add(currGroup); + currGroup = new ArrayList<>(); + } + } + + if (currGroup.size() != 0) + groupedSSTables.add(currGroup); + } + return groupedSSTables; + + } + public int getEstimatedRemainingTasks() { return manifest.getEstimatedTasks(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index e47f0e9..f632a65 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -27,6 +27,7 @@ import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.SimpleStrategy; import org.junit.BeforeClass; +import org.junit.After; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -41,6 +42,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; import static junit.framework.Assert.assertFalse; import static org.junit.Assert.assertEquals; @@ -51,14 +53,23 @@ public class AntiCompactionTest private static final String KEYSPACE1 = "AntiCompactionTest"; private static final String CF = "Standard1"; + @BeforeClass public static void defineSchema() throws ConfigurationException { SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF)); + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF)); + } + + @After + public void truncateCF() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.truncateBlocking(); } @Test @@ -113,4 +124,83 @@ public class AntiCompactionTest assertEquals(repairedKeys, 4); assertEquals(nonRepairedKeys, 6); } + + + public void generateSStable(ColumnFamilyStore store, String Suffix) + { + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + for (int j = 0; j < 10; j++) + rm.add("Standard1", Util.cellname(Integer.toString(j)), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + 0); + rm.apply(); + } + store.forceBlockingFlush(); + } + + @Test + public void antiCompactTenSTC() throws InterruptedException, ExecutionException, IOException{ + antiCompactTen("SizeTieredCompactionStrategy"); + } + + @Test + public void antiCompactTenLC() throws InterruptedException, ExecutionException, IOException{ + antiCompactTen("LeveledCompactionStrategy"); + } + + public void antiCompactTen(String compactionStrategy) throws InterruptedException, ExecutionException, IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.setCompactionStrategyClass(compactionStrategy); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) + { + generateSStable(store,Integer.toString(table)); + } + Collection<SSTableReader> sstables = store.getUnrepairedSSTables(); + assertEquals(store.getSSTables().size(), sstables.size()); + + Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + SSTableReader.acquireReferences(sstables); + long repairedAt = 1000; + CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt); + /* + Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time + so there will be no net change in the number of sstables + */ + assertEquals(10, store.getSSTables().size()); + int repairedKeys = 0; + int nonRepairedKeys = 0; + for (SSTableReader sstable : store.getSSTables()) + { + SSTableScanner scanner = sstable.getScanner(); + while (scanner.hasNext()) + { + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (sstable.isRepaired()) + { + assertTrue(range.contains(row.getKey().getToken())); + assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt); + repairedKeys++; + } + else + { + assertFalse(range.contains(row.getKey().getToken())); + assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); + nonRepairedKeys++; + } + } + } + assertEquals(repairedKeys, 40); + assertEquals(nonRepairedKeys, 60); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 5f9b72b..7eec449 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -18,7 +18,13 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.junit.After; import org.junit.Before; @@ -30,7 +36,10 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -84,6 +93,54 @@ public class LeveledCompactionStrategyTest cfs.truncateBlocking(); } + /** + * Ensure that the grouping operation preserves the levels of grouped tables + */ + @Test + public void testGrouperLevels() throws Exception{ + ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files + + // Enough data to have a level 1 and 2 + int rows = 20; + int columns = 10; + + // Adds enough data to trigger multiple sstable per level + for (int r = 0; r < rows; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + for (int c = 0; c < columns; c++) + { + rm.add(CF_STANDARDDLEVELED, Util.cellname("column" + c), value, 0); + } + rm.apply(); + cfs.forceBlockingFlush(); + } + + waitForLeveling(cfs); + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy(); + // Checking we're not completely bad at math + assert strategy.getLevelSize(1) > 0; + assert strategy.getLevelSize(2) > 0; + + Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(cfs.getSSTables()); + for (Collection<SSTableReader> sstableGroup : groupedSSTables) + { + int groupLevel = -1; + Iterator<SSTableReader> it = sstableGroup.iterator(); + while (it.hasNext()) + { + + SSTableReader sstable = it.next(); + int tableLevel = sstable.getSSTableLevel(); + if (groupLevel == -1) + groupLevel = tableLevel; + assert groupLevel == tableLevel; + } + } + + } + /* * This exercises in particular the code of #4142 */