Pre-create deletion log records to finish compactions quicker

Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-12763


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d73f45ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d73f45ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d73f45ba

Branch: refs/heads/trunk
Commit: d73f45bad4cd6d8cf1cea7d9b35b76075dc277e1
Parents: f39e28d
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Dec 11 15:11:20 2017 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Feb 19 07:52:20 2018 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/lifecycle/Helpers.java  | 19 ++-----
 .../apache/cassandra/db/lifecycle/LogFile.java  | 37 +++++++++++--
 .../cassandra/db/lifecycle/LogRecord.java       | 11 ++--
 .../cassandra/db/lifecycle/LogTransaction.java  | 33 ++++++------
 .../apache/cassandra/db/lifecycle/Tracker.java  |  2 +-
 .../cassandra/db/lifecycle/HelpersTest.java     | 56 +++++---------------
 7 files changed, 74 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 14a62a4..8cf665e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Pre-create deletion log records to finish compactions quicker 
(CASSANDRA-12763)
 Merged from 2.1:
  * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt 
(CASSANDRA-14183)
 Merged from 2.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java 
b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index b9adc4b..8e0d514 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -22,6 +22,7 @@ import java.util.*;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.Throwables;
 
@@ -127,11 +128,12 @@ class Helpers
 
     static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, 
LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable 
accumulate)
     {
+        Map<SSTable, LogRecord> logRecords = 
txnLogs.makeRemoveRecords(readers);
         for (SSTableReader reader : readers)
         {
             try
             {
-                obsoletions.add(new LogTransaction.Obsoletion(reader, 
txnLogs.obsoleted(reader)));
+                obsoletions.add(new LogTransaction.Obsoletion(reader, 
txnLogs.obsoleted(reader, logRecords.get(reader))));
             }
             catch (Throwable t)
             {
@@ -141,21 +143,6 @@ class Helpers
         return accumulate;
     }
 
-    static Throwable prepareForBulkObsoletion(Iterable<SSTableReader> readers, 
LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable 
accumulate)
-    {
-        try
-        {
-            for (Map.Entry<SSTableReader, LogTransaction.SSTableTidier> entry 
: txnLogs.bulkObsoletion(readers).entrySet())
-                obsoletions.add(new LogTransaction.Obsoletion(entry.getKey(), 
entry.getValue()));
-        }
-        catch (Throwable t)
-        {
-            accumulate = Throwables.merge(accumulate, t);
-        }
-
-        return accumulate;
-    }
-
     static Throwable abortObsoletion(List<LogTransaction.Obsoletion> 
obsoletions, Throwable accumulate)
     {
         if (obsoletions == null || obsoletions.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index be26163..8425a6d 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -281,18 +281,23 @@ final class LogFile implements AutoCloseable
 
     void add(Type type, SSTable table)
     {
-        if (!addRecord(makeRecord(type, table)))
+        add(makeRecord(type, table));
+    }
+
+    void add(LogRecord record)
+    {
+        if (!addRecord(record))
             throw new IllegalStateException();
     }
 
     public void addAll(Type type, Iterable<SSTableReader> toBulkAdd)
     {
-        for (LogRecord record : makeRecords(type, toBulkAdd))
+        for (LogRecord record : makeRecords(type, toBulkAdd).values())
             if (!addRecord(record))
                 throw new IllegalStateException();
     }
 
-    private Collection<LogRecord> makeRecords(Type type, 
Iterable<SSTableReader> tables)
+    Map<SSTable, LogRecord> makeRecords(Type type, Iterable<SSTableReader> 
tables)
     {
         assert type == Type.ADD || type == Type.REMOVE;
 
@@ -313,6 +318,20 @@ final class LogFile implements AutoCloseable
         return LogRecord.make(type, table);
     }
 
+    /**
+     * this version of makeRecord takes an existing LogRecord and converts it 
to a
+     * record with the given type. This avoids listing the directory and if the
+     * LogRecord already exists, we have all components for the sstable
+     */
+    private LogRecord makeRecord(Type type, SSTable table, LogRecord record)
+    {
+        assert type == Type.ADD || type == Type.REMOVE;
+
+        File folder = table.descriptor.directory;
+        replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        return record.asType(type);
+    }
+
     private boolean addRecord(LogRecord record)
     {
         if (records.contains(record))
@@ -334,7 +353,17 @@ final class LogFile implements AutoCloseable
 
     boolean contains(Type type, SSTable table)
     {
-        return records.contains(makeRecord(type, table));
+        return contains(makeRecord(type, table));
+    }
+
+    boolean contains(Type type, SSTable sstable, LogRecord record)
+    {
+        return contains(makeRecord(type, sstable, record));
+    }
+
+    private boolean contains(LogRecord record)
+    {
+        return records.contains(record);
     }
 
     void deleteFilesForRecordsOfType(Type type)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index a322ea1..1dc17f6 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -158,7 +158,7 @@ final class LogRecord
         return make(type, getExistingFiles(absoluteTablePath), 
table.getAllFilePaths().size(), absoluteTablePath);
     }
 
-    public static Collection<LogRecord> make(Type type, 
Iterable<SSTableReader> tables)
+    public static Map<SSTable, LogRecord> make(Type type, 
Iterable<SSTableReader> tables)
     {
         // contains a mapping from sstable absolute path (everything up until 
the 'Data'/'Index'/etc part of the filename) to the sstable
         Map<String, SSTable> absolutePaths = new HashMap<>();
@@ -167,13 +167,13 @@ final class LogRecord
 
         // maps sstable base file name to the actual files on disk
         Map<String, List<File>> existingFiles = 
getExistingFiles(absolutePaths.keySet());
-        List<LogRecord> records = new ArrayList<>(existingFiles.size());
+        Map<SSTable, LogRecord> records = new HashMap<>(existingFiles.size());
         for (Map.Entry<String, List<File>> entry : existingFiles.entrySet())
         {
             List<File> filesOnDisk = entry.getValue();
             String baseFileName = entry.getKey();
             SSTable sstable = absolutePaths.get(baseFileName);
-            records.add(make(type, filesOnDisk, 
sstable.getAllFilePaths().size(), baseFileName));
+            records.put(sstable, make(type, filesOnDisk, 
sstable.getAllFilePaths().size(), baseFileName));
         }
         return records;
     }
@@ -415,4 +415,9 @@ final class LogRecord
         FBUtilities.updateChecksumInt(crc32, numFiles);
         return crc32.getValue() & (Long.MAX_VALUE);
     }
+
+    LogRecord asType(Type type)
+    {
+        return new LogRecord(type, absolutePath.orElse(null), updateTime, 
numFiles);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 6599142..a10bcd2 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -143,19 +143,28 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
     }
 
     /**
+     * helper method for tests, creates the remove records per sstable
+     */
+    @VisibleForTesting
+    SSTableTidier obsoleted(SSTableReader sstable)
+    {
+        return obsoleted(sstable, LogRecord.make(Type.REMOVE, sstable));
+    }
+
+    /**
      * Schedule a reader for deletion as soon as it is fully unreferenced.
      */
-    SSTableTidier obsoleted(SSTableReader reader)
+    SSTableTidier obsoleted(SSTableReader reader, LogRecord logRecord)
     {
-        if (txnFile.contains(Type.ADD, reader))
+        if (txnFile.contains(Type.ADD, reader, logRecord))
         {
-            if (txnFile.contains(Type.REMOVE, reader))
+            if (txnFile.contains(Type.REMOVE, reader, logRecord))
                 throw new IllegalArgumentException();
 
             return new SSTableTidier(reader, true, this);
         }
 
-        txnFile.add(Type.REMOVE, reader);
+        txnFile.add(logRecord);
 
         if (tracker != null)
             tracker.notifyDeleting(reader);
@@ -163,22 +172,12 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
         return new SSTableTidier(reader, false, this);
     }
 
-    Map<SSTableReader, SSTableTidier> bulkObsoletion(Iterable<SSTableReader> 
sstables)
+    Map<SSTable, LogRecord> makeRemoveRecords(Iterable<SSTableReader> sstables)
     {
-        if (!txnFile.isEmpty())
-            throw new IllegalStateException("Bad state when doing bulk 
obsoletions");
-
-        txnFile.addAll(Type.REMOVE, sstables);
-        Map<SSTableReader, SSTableTidier> tidiers = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-        {
-            if (tracker != null)
-                tracker.notifyDeleting(sstable);
-            tidiers.put(sstable, new SSTableTidier(sstable, false, this));
-        }
-        return tidiers;
+        return txnFile.makeRecords(Type.REMOVE, sstables);
     }
 
+
     OperationType type()
     {
         return txnFile.type();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index d281278..9feaa3e 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -245,7 +245,7 @@ public class Tracker
             // It is important that any method accepting/returning a Throwable 
never throws an exception, and does its best
             // to complete the instructions given to it
             List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
-            accumulate = prepareForBulkObsoletion(removed, txnLogs, 
obsoletions, accumulate);
+            accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, 
accumulate);
             try
             {
                 txnLogs.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 1b8e265..1d9f8aa 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -18,21 +18,14 @@
 */
 package org.apache.cassandra.db.lifecycle;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,6 +38,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class HelpersTest
 {
@@ -165,23 +159,13 @@ public class HelpersTest
     @Test
     public void testMarkObsolete()
     {
-        testMarkObsoleteHelper(false);
-    }
-    @Test
-    public void testBulkMarkObsolete()
-    {
-        testMarkObsoleteHelper(true);
-    }
-
-    public void testMarkObsoleteHelper(boolean bulk)
-    {
         ColumnFamilyStore cfs = MockSchema.newCFS();
         LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
         Iterable<SSTableReader> readers = 
Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
         Iterable<SSTableReader> readersToKeep = 
Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs));
 
         List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
-        Assert.assertNull(bulk ? Helpers.prepareForBulkObsoletion(readers, 
txnLogs, obsoletions, null) : Helpers.prepareForObsoletion(readers, txnLogs, 
obsoletions, null));
+        Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null);
         assertNotNull(obsoletions);
         assertEquals(2, obsoletions.size());
 
@@ -200,37 +184,21 @@ public class HelpersTest
     }
 
     @Test
-    public void compareBulkAndNormalObsolete() throws IOException
+    public void testObsoletionPerformance()
     {
         ColumnFamilyStore cfs = MockSchema.newCFS();
         LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
-        LogTransaction txnLogs2 = new LogTransaction(OperationType.UNKNOWN);
-
-        Collection<SSTableReader> readers = 
Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
-        // add a few readers that should not be removed:
-        Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, 
cfs));
+        List<SSTableReader> readers = new ArrayList<>();
 
-        List<LogTransaction.Obsoletion> normalObsoletions = new ArrayList<>();
-        List<LogTransaction.Obsoletion> bulkObsoletions = new ArrayList<>();
-
-        Assert.assertNull(Helpers.prepareForBulkObsoletion(readers, txnLogs, 
normalObsoletions, null));
-        Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs2, 
bulkObsoletions, null));
-
-        assertEquals(Sets.newHashSet(readers), 
normalObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
-        assertEquals(Sets.newHashSet(readers), 
bulkObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
-
-        Set<String> normalLogRecords = new HashSet<>();
-        Set<String> bulkLogRecords = new HashSet<>();
-
-        for (File f : txnLogs.logFiles())
-            Files.lines(f.toPath()).forEach(bulkLogRecords::add);
-        for (File f : txnLogs2.logFiles())
-            Files.lines(f.toPath()).forEach(normalLogRecords::add);
-
-        Assert.assertEquals(readers.size(), normalLogRecords.size());
-        Assert.assertEquals(bulkLogRecords, normalLogRecords);
+        for (int i = 0; i < 10000; i++)
+        {
+            readers.add(MockSchema.sstable(i + 1, cfs));
+        }
+        long start = System.currentTimeMillis();
 
+        Helpers.prepareForObsoletion(readers.subList(0, 500), txnLogs, new 
ArrayList<>(),null );
         txnLogs.finish();
-        txnLogs2.finish();
+        long time = System.currentTimeMillis() - start;
+        assertTrue(time < 20000);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to