This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 9af57a5 Filter sstables earlier when running cleanup 9af57a5 is described below commit 9af57a508da637f85b32ada0f54e91c72aca0104 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Apr 25 12:31:25 2019 +0200 Filter sstables earlier when running cleanup Patch by marcuse; reviewed by Jordan West for CASSANDRA-15100 --- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionManager.java | 30 +++++++++--- test/unit/org/apache/cassandra/db/CleanupTest.java | 53 ++++++++++++++++++---- 3 files changed, 68 insertions(+), 16 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c2bed92..a46a327 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.19 + * Filter sstables earlier when running cleanup (CASSANDRA-15100) * Use mean row count instead of mean column count for index selectivity calculation (CASSANDRA-15259) * Avoid updating unchanged gossip states (CASSANDRA-15097) * Prevent recreation of previously dropped columns with a different kind (CASSANDRA-14948) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 1bd8ff3..694ad62 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -285,6 +285,7 @@ public class CompactionManager implements CompactionManagerMBean @SuppressWarnings("resource") private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException { + logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName()); List<LifecycleTransaction> transactions = new ArrayList<>(); List<Future<?>> futures = new ArrayList<>(); try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType)) @@ -326,6 +327,7 @@ public class CompactionManager implements CompactionManagerMBean } FBUtilities.waitOnFutures(futures); assert compacting.originals().isEmpty(); + logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName()); return AllSSTableOpStatus.SUCCESSFUL; } finally @@ -341,7 +343,7 @@ public class CompactionManager implements CompactionManagerMBean } Throwable fail = Throwables.close(null, transactions); if (fail != null) - logger.error("Failed to cleanup lifecycle transactions", fail); + logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail); } } @@ -463,7 +465,25 @@ public class CompactionManager implements CompactionManagerMBean public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) { List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals()); - Collections.sort(sortedSSTables, new SSTableReader.SizeComparator()); + Iterator<SSTableReader> sstableIter = sortedSSTables.iterator(); + int totalSSTables = 0; + int skippedSStables = 0; + while (sstableIter.hasNext()) + { + SSTableReader sstable = sstableIter.next(); + totalSSTables++; + if (!needsCleanup(sstable, ranges)) + { + logger.debug("Not cleaning up {} ([{}, {}]) - no tokens outside owned ranges {}", + sstable, sstable.first.getToken(), sstable.last.getToken(), ranges); + sstableIter.remove(); + transaction.cancel(sstable); + skippedSStables++; + } + } + logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges ({})", + skippedSStables, totalSSTables, cfStore.keyspace.getName(), cfStore.getTableName(), ranges); + sortedSSTables.sort(new SSTableReader.SizeComparator()); return sortedSSTables; } @@ -886,11 +906,7 @@ public class CompactionManager implements CompactionManagerMBean { txn.obsoleteOriginals(); txn.finish(); - return; - } - if (!needsCleanup(sstable, ranges)) - { - logger.trace("Skipping {} for cleanup; all rows should be kept", sstable); + logger.info("SSTable {} ([{}, {}]) does not intersect the owned ranges ({}), dropping it", sstable, sstable.first.getToken(), sstable.last.getToken(), ranges); return; } diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index 99030c5..d4c613d 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -28,10 +28,12 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import org.junit.BeforeClass; import org.junit.Test; @@ -68,6 +70,9 @@ public class CleanupTest public static final String CF_INDEXED2 = "Indexed2"; public static final String CF_STANDARD2 = "Standard2"; + public static final String KEYSPACE3 = "CleanupSkipSSTables"; + public static final String CF_STANDARD3 = "Standard3"; + public static final ByteBuffer COLUMN = ByteBufferUtil.bytes("birthdate"); public static final ByteBuffer VALUE = ByteBuffer.allocate(8); static @@ -105,9 +110,11 @@ public class CleanupTest KeyspaceParams.nts("DC1", 1), SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD2), SchemaLoader.compositeIndexCFMD(KEYSPACE2, CF_INDEXED2, true)); + SchemaLoader.createKeyspace(KEYSPACE3, + KeyspaceParams.nts("DC1", 1), + SchemaLoader.standardCFMD(KEYSPACE3, CF_STANDARD3)); } - /* @Test public void testCleanup() throws ExecutionException, InterruptedException { @@ -116,17 +123,13 @@ public class CleanupTest Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - UnfilteredPartitionIterator iter; - // insert data and verify we get it back w/ range query fillCF(cfs, "val", LOOPS); // record max timestamps of the sstables pre-cleanup List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs); - iter = Util.getRangeSlice(cfs); - assertEquals(LOOPS, Iterators.size(iter)); - + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); // with one token in the ring, owned by the local node, cleanup should be a no-op CompactionManager.instance.performCleanup(cfs, 2); @@ -134,10 +137,8 @@ public class CleanupTest assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs)); // check data is still there - iter = Util.getRangeSlice(cfs); - assertEquals(LOOPS, Iterators.size(iter)); + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); } - */ @Test public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException @@ -234,6 +235,40 @@ public class CleanupTest assertTrue(cfs.getLiveSSTables().isEmpty()); } + @Test + public void testCleanupSkippingSSTables() throws UnknownHostException, ExecutionException, InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE3); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD3); + cfs.disableAutoCompaction(); + for (byte i = 0; i < 100; i++) + { + new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), ByteBuffer.wrap(new byte[] {i})) + .clustering(COLUMN) + .add("val", VALUE) + .build() + .applyUnsafe(); + cfs.forceBlockingFlush(); + } + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + tmd.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.0.0.1")); + tmd.updateNormalToken(token(new byte[] {50}), InetAddress.getByName("127.0.0.1")); + Set<SSTableReader> beforeFirstCleanup = Sets.newHashSet(cfs.getLiveSSTables()); + // single token - 127.0.0.1 owns everything, cleanup should be noop + cfs.forceCleanup(2); + assertEquals(beforeFirstCleanup, cfs.getLiveSSTables()); + tmd.updateNormalToken(token(new byte[] {120}), InetAddress.getByName("127.0.0.2")); + cfs.forceCleanup(2); + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + assertEquals(sstable.first, sstable.last); // single-token sstables + assertTrue(sstable.first.getToken().compareTo(token(new byte[]{50})) <= 0); + // with single-token sstables they should all either be skipped or dropped: + assertTrue(beforeFirstCleanup.contains(sstable)); + } + } + @Test public void testNeedsCleanup() throws Exception --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org