Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 d17836dec -> a03424ef9
  refs/heads/trunk caf50de31 -> b80f6c65f


Correct sstable sorting for garbagecollect and levelled compaction

patch by Branimir Lambov and Vincent White; reviewed by Zhao Yang for 
CASSANDRA-14879


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a03424ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a03424ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a03424ef

Branch: refs/heads/cassandra-3.11
Commit: a03424ef95559c9df2bb7f86e1ac1edca1436058
Parents: d17836d
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Nov 7 13:10:39 2018 +0200
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Tue Nov 13 12:50:08 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/SinglePartitionReadCommand.java          |  4 +-
 .../db/compaction/CompactionManager.java        |  2 +-
 .../db/compaction/LeveledManifest.java          |  5 +-
 .../io/sstable/format/SSTableReader.java        |  4 +-
 .../tools/nodetool/GarbageCollect.java          |  8 ++-
 .../apache/cassandra/cql3/GcCompactionTest.java | 73 +++++++++++++++++++-
 .../LeveledCompactionStrategyTest.java          | 33 +++++++++
 8 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e07099a..83e8b08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.4
+ * Correct sstable sorting for garbagecollect and levelled compaction 
(CASSANDRA-14870)
 Merged from 3.0:
  * Move TWCS message 'No compaction necessary for bucket size' to Trace level 
(CASSANDRA-14884)
  * Sstable min/max metadata can cause data loss (CASSANDRA-14861)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index ed98e28..bee4961 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -728,7 +728,7 @@ public class SinglePartitionReadCommand extends ReadCommand
              * In other words, iterating in maxTimestamp order allow to do our 
mostRecentPartitionTombstone elimination
              * in one pass, and minimize the number of sstables for which we 
read a partition tombstone.
              */
-            Collections.sort(view.sstables, 
SSTableReader.maxTimestampComparator);
+            Collections.sort(view.sstables, 
SSTableReader.maxTimestampDescending);
             long mostRecentPartitionTombstone = Long.MIN_VALUE;
             int nonIntersectingSSTables = 0;
             List<SSTableReader> skippedSSTablesWithTombstones = null;
@@ -916,7 +916,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         }
 
         /* add the SSTables on disk */
-        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+        Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
         boolean onlyUnrepaired = true;
         // read sorted sstables
         SSTableReadMetricsCollector metricsCollector = new 
SSTableReadMetricsCollector();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 235fe2b..61da975 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -498,7 +498,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                 if 
(cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
                     originals = Iterables.filter(originals, 
SSTableReader::isRepaired);
                 List<SSTableReader> sortedSSTables = 
Lists.newArrayList(originals);
-                Collections.sort(sortedSSTables, 
SSTableReader.maxTimestampComparator);
+                Collections.sort(sortedSSTables, 
SSTableReader.maxTimestampAscending);
                 return sortedSSTables;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index ceb3811..520b08d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -746,10 +746,11 @@ public class LeveledManifest
         return sstables;
     }
 
-    private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> 
candidates)
+    @VisibleForTesting
+    List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
     {
         List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
-        Collections.sort(ageSortedCandidates, 
SSTableReader.maxTimestampComparator);
+        Collections.sort(ageSortedCandidates, 
SSTableReader.maxTimestampAscending);
         return ageSortedCandidates;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 2f1af58..116d489 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -154,8 +154,8 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
     }
     private static final RateLimiter meterSyncThrottle = 
RateLimiter.create(100.0);
 
-    // Descending order
-    public static final Comparator<SSTableReader> maxTimestampComparator = 
(o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp());
+    public static final Comparator<SSTableReader> maxTimestampDescending = 
(o1, o2) -> Long.compare(o2.getMaxTimestamp(), o1.getMaxTimestamp());
+    public static final Comparator<SSTableReader> maxTimestampAscending = (o1, 
o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp());
 
     // it's just an object, which we use regular Object equality on; we 
introduce a special class just for easy recognition
     public static final class UniqueIdentifier {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java 
b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
index 37daf09..baa245f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java
@@ -41,8 +41,10 @@ public class GarbageCollect extends NodeToolCmd
 
     @Option(title = "jobs",
             name = {"-j", "--jobs"},
-            description = "Number of sstables to cleanup simultanously, set to 
0 to use all available compaction threads")
-    private int jobs = 2;
+            description = "Number of sstables to cleanup simultanously, set to 
0 to use all available compaction " +
+                          "threads. Defaults to 1 so that collections of newer 
tables can see the data is deleted " +
+                          "and also remove tombstones.")
+    private int jobs = 1;
 
     @Override
     public void execute(NodeProbe probe)
@@ -61,4 +63,4 @@ public class GarbageCollect extends NodeToolCmd
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java 
b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
index 84a20de..548cdc1 100644
--- a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.cql3;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class GcCompactionTest extends CQLTester
@@ -149,6 +151,75 @@ public class GcCompactionTest extends CQLTester
     }
 
     @Test
+    public void testGarbageCollectOrder() throws Throwable
+    {
+        // partition-level deletions, 0 gc_grace
+        createTable("CREATE TABLE %s(" +
+                    "  key int," +
+                    "  column int," +
+                    "  col2 int," +
+                    "  data int," +
+                    "  extra text," +
+                    "  PRIMARY KEY((key, column))" +
+                    ") WITH gc_grace_seconds = 0;"
+        );
+
+        assertEquals(1, getCurrentColumnFamilyStore().gcBefore(1)); // make 
sure gc_grace is 0
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, 
?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int rowCount0 = countRows(table0);
+
+        deleteWithSomeInserts(3, 5, 10);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        final int rowCount1 = countRows(table1);
+        assertTrue(rowCount1 > 0);
+        assertTrue(countTombstoneMarkers(table1) > 0);
+
+        deleteWithSomeInserts(2, 4, 0);
+        flush();
+        assertEquals(3, cfs.getLiveSSTables().size());
+        SSTableReader table2 = getNewTable(readers);
+        assertEquals(0, countRows(table2));
+        assertTrue(countTombstoneMarkers(table2) > 0);
+
+        // Wait a little to make sure nowInSeconds is greater than gcBefore
+        Thread.sleep(1000);
+
+        CompactionManager.AllSSTableOpStatus status =
+                
CompactionManager.instance.performGarbageCollection(getCurrentColumnFamilyStore(),
 CompactionParams.TombstoneOption.ROW, 1);
+        assertEquals(CompactionManager.AllSSTableOpStatus.SUCCESSFUL, status);
+
+        SSTableReader[] tables = cfs.getLiveSSTables().toArray(new 
SSTableReader[0]);
+        Arrays.sort(tables, (o1, o2) -> 
Integer.compare(o1.descriptor.generation, o2.descriptor.generation));  // by 
order of compaction
+
+        // Make sure deleted data was removed
+        assertTrue(rowCount0 > countRows(tables[0]));
+        assertTrue(rowCount1 > countRows(tables[1]));
+
+        // Make sure all tombstones got purged
+        for (SSTableReader t : tables)
+        {
+            assertEquals("Table " + t + " has tombstones", 0, 
countTombstoneMarkers(t));
+        }
+
+        // The last table should have become empty and be removed
+        assertEquals(2, tables.length);
+    }
+
+    @Test
     public void testGcCompactionCells() throws Throwable
     {
         createTable("CREATE TABLE %s(" +
@@ -387,4 +458,4 @@ public class GcCompactionTest extends CQLTester
         }
         return instances;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a03424ef/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index de8efd7..b1d467e 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -447,4 +447,37 @@ public class LeveledCompactionStrategyTest
         // the 11 tables containing key1 should all compact to 1 table
         assertEquals(1, cfs.getLiveSSTables().size());
     }
+
+    @Test
+    public void testCompactionCandidateOrdering() throws Exception
+    {
+        // add some data
+        byte [] b = new byte[100 * 1024];
+        new Random().nextBytes(b);
+        ByteBuffer value = ByteBuffer.wrap(b);
+        int rows = 4;
+        int columns = 10;
+        // Just keep sstables in L0 for this test
+        cfs.disableAutoCompaction();
+        for (int r = 0; r < rows; r++)
+        {
+            UpdateBuilder update = UpdateBuilder.create(cfs.metadata, 
String.valueOf(r));
+            for (int c = 0; c < columns; c++)
+                update.newRow("column" + c).add("val", value);
+            update.applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) 
(cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
+        // get readers for level 0 sstables
+        Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
+        Collection<SSTableReader> sortedCandidates = 
strategy.manifest.ageSortedSSTables(sstables);
+        assertTrue(String.format("More than 1 sstable required for test, 
found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
+        long lastMaxTimeStamp = Long.MIN_VALUE;
+        for (SSTableReader sstable : sortedCandidates)
+        {
+            assertTrue(String.format("SStables not sorted into oldest to 
newest by maxTimestamp. Current sstable: %d , last sstable: %d", 
sstable.getMaxTimestamp(), lastMaxTimeStamp),
+                       sstable.getMaxTimestamp() > lastMaxTimeStamp);
+            lastMaxTimeStamp = sstable.getMaxTimestamp();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to