This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 9af57a5  Filter sstables earlier when running cleanup
9af57a5 is described below

commit 9af57a508da637f85b32ada0f54e91c72aca0104
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Thu Apr 25 12:31:25 2019 +0200

    Filter sstables earlier when running cleanup
    
    Patch by marcuse; reviewed by Jordan West for CASSANDRA-15100
---
 CHANGES.txt                                        |  1 +
 .../cassandra/db/compaction/CompactionManager.java | 30 +++++++++---
 test/unit/org/apache/cassandra/db/CleanupTest.java | 53 ++++++++++++++++++----
 3 files changed, 68 insertions(+), 16 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c2bed92..a46a327 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.19
+ * Filter sstables earlier when running cleanup (CASSANDRA-15100)
  * Use mean row count instead of mean column count for index selectivity 
calculation (CASSANDRA-15259)
  * Avoid updating unchanged gossip states (CASSANDRA-15097)
  * Prevent recreation of previously dropped columns with a different kind 
(CASSANDRA-14948)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1bd8ff3..694ad62 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -285,6 +285,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     @SuppressWarnings("resource")
     private AllSSTableOpStatus parallelAllSSTableOperation(final 
ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, 
OperationType operationType) throws ExecutionException, InterruptedException
     {
+        logger.info("Starting {} for {}.{}", operationType, 
cfs.keyspace.getName(), cfs.getTableName());
         List<LifecycleTransaction> transactions = new ArrayList<>();
         List<Future<?>> futures = new ArrayList<>();
         try (LifecycleTransaction compacting = 
cfs.markAllCompacting(operationType))
@@ -326,6 +327,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             }
             FBUtilities.waitOnFutures(futures);
             assert compacting.originals().isEmpty();
+            logger.info("Finished {} for {}.{} successfully", operationType, 
cfs.keyspace.getName(), cfs.getTableName());
             return AllSSTableOpStatus.SUCCESSFUL;
         }
         finally
@@ -341,7 +343,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             }
             Throwable fail = Throwables.close(null, transactions);
             if (fail != null)
-                logger.error("Failed to cleanup lifecycle transactions", fail);
+                logger.error("Failed to cleanup lifecycle transactions ({} for 
{}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail);
         }
     }
 
@@ -463,7 +465,25 @@ public class CompactionManager implements 
CompactionManagerMBean
             public Iterable<SSTableReader> filterSSTables(LifecycleTransaction 
transaction)
             {
                 List<SSTableReader> sortedSSTables = 
Lists.newArrayList(transaction.originals());
-                Collections.sort(sortedSSTables, new 
SSTableReader.SizeComparator());
+                Iterator<SSTableReader> sstableIter = 
sortedSSTables.iterator();
+                int totalSSTables = 0;
+                int skippedSStables = 0;
+                while (sstableIter.hasNext())
+                {
+                    SSTableReader sstable = sstableIter.next();
+                    totalSSTables++;
+                    if (!needsCleanup(sstable, ranges))
+                    {
+                        logger.debug("Not cleaning up {} ([{}, {}]) - no 
tokens outside owned ranges {}",
+                                     sstable, sstable.first.getToken(), 
sstable.last.getToken(), ranges);
+                        sstableIter.remove();
+                        transaction.cancel(sstable);
+                        skippedSStables++;
+                    }
+                }
+                logger.info("Skipping cleanup for {}/{} sstables for {}.{} 
since they are fully contained in owned ranges ({})",
+                            skippedSStables, totalSSTables, 
cfStore.keyspace.getName(), cfStore.getTableName(), ranges);
+                sortedSSTables.sort(new SSTableReader.SizeComparator());
                 return sortedSSTables;
             }
 
@@ -886,11 +906,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             txn.obsoleteOriginals();
             txn.finish();
-            return;
-        }
-        if (!needsCleanup(sstable, ranges))
-        {
-            logger.trace("Skipping {} for cleanup; all rows should be kept", 
sstable);
+            logger.info("SSTable {} ([{}, {}]) does not intersect the owned 
ranges ({}), dropping it", sstable, sstable.first.getToken(), 
sstable.last.getToken(), ranges);
             return;
         }
 
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java 
b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 99030c5..d4c613d 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -28,10 +28,12 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -68,6 +70,9 @@ public class CleanupTest
     public static final String CF_INDEXED2 = "Indexed2";
     public static final String CF_STANDARD2 = "Standard2";
 
+    public static final String KEYSPACE3 = "CleanupSkipSSTables";
+    public static final String CF_STANDARD3 = "Standard3";
+
     public static final ByteBuffer COLUMN = ByteBufferUtil.bytes("birthdate");
     public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
     static
@@ -105,9 +110,11 @@ public class CleanupTest
                                     KeyspaceParams.nts("DC1", 1),
                                     SchemaLoader.standardCFMD(KEYSPACE2, 
CF_STANDARD2),
                                     SchemaLoader.compositeIndexCFMD(KEYSPACE2, 
CF_INDEXED2, true));
+        SchemaLoader.createKeyspace(KEYSPACE3,
+                                    KeyspaceParams.nts("DC1", 1),
+                                    SchemaLoader.standardCFMD(KEYSPACE3, 
CF_STANDARD3));
     }
 
-    /*
     @Test
     public void testCleanup() throws ExecutionException, InterruptedException
     {
@@ -116,17 +123,13 @@ public class CleanupTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
 
-        UnfilteredPartitionIterator iter;
-
         // insert data and verify we get it back w/ range query
         fillCF(cfs, "val", LOOPS);
 
         // record max timestamps of the sstables pre-cleanup
         List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
 
-        iter = Util.getRangeSlice(cfs);
-        assertEquals(LOOPS, Iterators.size(iter));
-
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
         // with one token in the ring, owned by the local node, cleanup should 
be a no-op
         CompactionManager.instance.performCleanup(cfs, 2);
 
@@ -134,10 +137,8 @@ public class CleanupTest
         assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
 
         // check data is still there
-        iter = Util.getRangeSlice(cfs);
-        assertEquals(LOOPS, Iterators.size(iter));
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
     }
-    */
 
     @Test
     public void testCleanupWithIndexes() throws IOException, 
ExecutionException, InterruptedException
@@ -234,6 +235,40 @@ public class CleanupTest
         assertTrue(cfs.getLiveSSTables().isEmpty());
     }
 
+    @Test
+    public void testCleanupSkippingSSTables() throws UnknownHostException, 
ExecutionException, InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE3);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD3);
+        cfs.disableAutoCompaction();
+        for (byte i = 0; i < 100; i++)
+        {
+            new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 
ByteBuffer.wrap(new byte[] {i}))
+                .clustering(COLUMN)
+                .add("val", VALUE)
+                .build()
+                .applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        tmd.updateHostId(UUID.randomUUID(), 
InetAddress.getByName("127.0.0.1"));
+        tmd.updateNormalToken(token(new byte[] {50}), 
InetAddress.getByName("127.0.0.1"));
+        Set<SSTableReader> beforeFirstCleanup = 
Sets.newHashSet(cfs.getLiveSSTables());
+        // single token - 127.0.0.1 owns everything, cleanup should be noop
+        cfs.forceCleanup(2);
+        assertEquals(beforeFirstCleanup, cfs.getLiveSSTables());
+        tmd.updateNormalToken(token(new byte[] {120}), 
InetAddress.getByName("127.0.0.2"));
+        cfs.forceCleanup(2);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            assertEquals(sstable.first, sstable.last); // single-token sstables
+            assertTrue(sstable.first.getToken().compareTo(token(new 
byte[]{50})) <= 0);
+            // with single-token sstables they should all either be skipped or 
dropped:
+            assertTrue(beforeFirstCleanup.contains(sstable));
+        }
+    }
+
 
     @Test
     public void testNeedsCleanup() throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to