Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 d17836dec -> a03424ef9 refs/heads/trunk caf50de31 -> b80f6c65f
Correct sstable sorting for garbagecollect and levelled compaction patch by Branimir Lambov and Vincent White; reviewed by Zhao Yang for CASSANDRA-14879 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a03424ef Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a03424ef Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a03424ef Branch: refs/heads/cassandra-3.11 Commit: a03424ef95559c9df2bb7f86e1ac1edca1436058 Parents: d17836d Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Nov 7 13:10:39 2018 +0200 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Tue Nov 13 12:50:08 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/SinglePartitionReadCommand.java | 4 +- .../db/compaction/CompactionManager.java | 2 +- .../db/compaction/LeveledManifest.java | 5 +- .../io/sstable/format/SSTableReader.java | 4 +- .../tools/nodetool/GarbageCollect.java | 8 ++- .../apache/cassandra/cql3/GcCompactionTest.java | 73 +++++++++++++++++++- .../LeveledCompactionStrategyTest.java | 33 +++++++++ 8 files changed, 119 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e07099a..83e8b08 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.4 + * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870) Merged from 3.0: * Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884) * Sstable min/max metadata can cause data loss (CASSANDRA-14861) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index ed98e28..bee4961 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -728,7 +728,7 @@ public class SinglePartitionReadCommand extends ReadCommand * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination * in one pass, and minimize the number of sstables for which we read a partition tombstone. */ - Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + Collections.sort(view.sstables, SSTableReader.maxTimestampDescending); long mostRecentPartitionTombstone = Long.MIN_VALUE; int nonIntersectingSSTables = 0; List<SSTableReader> skippedSSTablesWithTombstones = null; @@ -916,7 +916,7 @@ public class SinglePartitionReadCommand extends ReadCommand } /* add the SSTables on disk */ - Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + Collections.sort(view.sstables, SSTableReader.maxTimestampDescending); boolean onlyUnrepaired = true; // read sorted sstables SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 235fe2b..61da975 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -498,7 +498,7 @@ public class CompactionManager implements CompactionManagerMBean if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones()) originals = Iterables.filter(originals, SSTableReader::isRepaired); List<SSTableReader> sortedSSTables = Lists.newArrayList(originals); - Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator); + Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending); return sortedSSTables; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index ceb3811..520b08d 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -746,10 +746,11 @@ public class LeveledManifest return sstables; } - private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates) + @VisibleForTesting + List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates) { List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates); - Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator); + Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampAscending); return ageSortedCandidates; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 2f1af58..116d489 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -154,8 +154,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0); - // Descending order - public static final Comparator<SSTableReader> maxTimestampComparator = (o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp()); + public static final Comparator<SSTableReader> maxTimestampDescending = (o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp()); + public static final Comparator<SSTableReader> maxTimestampAscending = (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()); // it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition public static final class UniqueIdentifier {} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java index 37daf09..baa245f 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java @@ -41,8 +41,10 @@ public class GarbageCollect extends NodeToolCmd @Option(title = "jobs", name = {"-j", "--jobs"}, - description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads") - private int jobs = 2; + description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction " + + "threads. Defaults to 1 so that collections of newer tables can see the data is deleted " + + "and also remove tombstones.") + private int jobs = 1; @Override public void execute(NodeProbe probe) @@ -61,4 +63,4 @@ public class GarbageCollect extends NodeToolCmd } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java index 84a20de..548cdc1 100644 --- a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java +++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.cql3; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -34,6 +35,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.utils.FBUtilities; public class GcCompactionTest extends CQLTester @@ -149,6 +151,75 @@ public class GcCompactionTest extends CQLTester } @Test + public void testGarbageCollectOrder() throws Throwable + { + // partition-level deletions, 0 gc_grace + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " col2 int," + + " data int," + + " extra text," + + " PRIMARY KEY((key, column))" + + ") WITH gc_grace_seconds = 0;" + ); + + assertEquals(1, getCurrentColumnFamilyStore().gcBefore(1)); // make sure gc_grace is 0 + + for (int i = 0; i < KEY_COUNT; ++i) + for (int j = 0; j < CLUSTERING_COUNT; ++j) + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j); + + + Set<SSTableReader> readers = new HashSet<>(); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + flush(); + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader table0 = getNewTable(readers); + assertEquals(0, countTombstoneMarkers(table0)); + int rowCount0 = countRows(table0); + + deleteWithSomeInserts(3, 5, 10); + flush(); + assertEquals(2, cfs.getLiveSSTables().size()); + SSTableReader table1 = getNewTable(readers); + final int rowCount1 = countRows(table1); + assertTrue(rowCount1 > 0); + assertTrue(countTombstoneMarkers(table1) > 0); + + deleteWithSomeInserts(2, 4, 0); + flush(); + assertEquals(3, cfs.getLiveSSTables().size()); + SSTableReader table2 = getNewTable(readers); + assertEquals(0, countRows(table2)); + assertTrue(countTombstoneMarkers(table2) > 0); + + // Wait a little to make sure nowInSeconds is greater than gcBefore + Thread.sleep(1000); + + CompactionManager.AllSSTableOpStatus status = + CompactionManager.instance.performGarbageCollection(getCurrentColumnFamilyStore(), CompactionParams.TombstoneOption.ROW, 1); + assertEquals(CompactionManager.AllSSTableOpStatus.SUCCESSFUL, status); + + SSTableReader[] tables = cfs.getLiveSSTables().toArray(new SSTableReader[0]); + Arrays.sort(tables, (o1, o2) -> Integer.compare(o1.descriptor.generation, o2.descriptor.generation)); // by order of compaction + + // Make sure deleted data was removed + assertTrue(rowCount0 > countRows(tables[0])); + assertTrue(rowCount1 > countRows(tables[1])); + + // Make sure all tombstones got purged + for (SSTableReader t : tables) + { + assertEquals("Table " + t + " has tombstones", 0, countTombstoneMarkers(t)); + } + + // The last table should have become empty and be removed + assertEquals(2, tables.length); + } + + @Test public void testGcCompactionCells() throws Throwable { createTable("CREATE TABLE %s(" + @@ -387,4 +458,4 @@ public class GcCompactionTest extends CQLTester } return instances; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index de8efd7..b1d467e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -447,4 +447,37 @@ public class LeveledCompactionStrategyTest // the 11 tables containing key1 should all compact to 1 table assertEquals(1, cfs.getLiveSSTables().size()); } + + @Test + public void testCompactionCandidateOrdering() throws Exception + { + // add some data + byte [] b = new byte[100 * 1024]; + new Random().nextBytes(b); + ByteBuffer value = ByteBuffer.wrap(b); + int rows = 4; + int columns = 10; + // Just keep sstables in L0 for this test + cfs.disableAutoCompaction(); + for (int r = 0; r < rows; r++) + { + UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r)); + for (int c = 0; c < columns; c++) + update.newRow("column" + c).add("val", value); + update.applyUnsafe(); + cfs.forceBlockingFlush(); + } + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0); + // get readers for level 0 sstables + Collection<SSTableReader> sstables = strategy.manifest.getLevel(0); + Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables); + assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1); + long lastMaxTimeStamp = Long.MIN_VALUE; + for (SSTableReader sstable : sortedCandidates) + { + assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp), + sstable.getMaxTimestamp() > lastMaxTimeStamp); + lastMaxTimeStamp = sstable.getMaxTimestamp(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org