Improve TRUNCATE performance Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-13909
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b32a9e64 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b32a9e64 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b32a9e64 Branch: refs/heads/cassandra-3.11 Commit: b32a9e6452c78e6ad08e371314bf1ab7492d0773 Parents: 15cee48 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Sep 25 14:44:37 2017 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Oct 2 09:29:22 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/lifecycle/Helpers.java | 15 +++++ .../apache/cassandra/db/lifecycle/LogFile.java | 25 ++++++++ .../cassandra/db/lifecycle/LogRecord.java | 65 +++++++++++++++++++- .../cassandra/db/lifecycle/LogTransaction.java | 16 +++++ .../apache/cassandra/db/lifecycle/Tracker.java | 2 +- .../cassandra/db/lifecycle/HelpersTest.java | 58 ++++++++++++++++- 7 files changed, 179 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4a45469..d6423b4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Improve TRUNCATE performance (CASSANDRA-13909) * Implement short read protection on partition boundaries (CASSANDRA-13595) * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911) * Filter header only commit logs before recovery (CASSANDRA-13918) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/Helpers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java index f9555f4..b9adc4b 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java @@ -141,6 +141,21 @@ class Helpers return accumulate; } + static Throwable prepareForBulkObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) + { + try + { + for (Map.Entry<SSTableReader, LogTransaction.SSTableTidier> entry : txnLogs.bulkObsoletion(readers).entrySet()) + obsoletions.add(new LogTransaction.Obsoletion(entry.getKey(), entry.getValue())); + } + catch (Throwable t) + { + accumulate = Throwables.merge(accumulate, t); + } + + return accumulate; + } + static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) { if (obsoletions == null || obsoletions.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index da5bb39..be26163 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LogRecord.Type; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.utils.Throwables; @@ -284,6 +285,25 @@ final class LogFile implements AutoCloseable throw new IllegalStateException(); } + public void addAll(Type type, Iterable<SSTableReader> toBulkAdd) + { + for (LogRecord record : makeRecords(type, toBulkAdd)) + if (!addRecord(record)) + throw new IllegalStateException(); + } + + private Collection<LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables) + { + assert type == Type.ADD || type == Type.REMOVE; + + for (SSTableReader sstable : tables) + { + File folder = sstable.descriptor.directory; + replicas.maybeCreateReplica(folder, getFileName(folder), records); + } + return LogRecord.make(type, tables); + } + private LogRecord makeRecord(Type type, SSTable table) { assert type == Type.ADD || type == Type.REMOVE; @@ -414,4 +434,9 @@ final class LogFile implements AutoCloseable { return records; } + + public boolean isEmpty() + { + return records.isEmpty(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java index ac6d6d0..a322ea1 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; +import java.io.FilenameFilter; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; @@ -30,7 +31,9 @@ import java.util.stream.Collectors; import java.util.zip.CRC32; import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.FBUtilities; @@ -151,10 +154,35 @@ final class LogRecord // there is no separator after the generation number, and this would cause files of sstables with // a higher generation number that starts with the same number, to be incorrectly classified as files // of this record sstable - String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename() + Component.separator); + String absoluteTablePath = absolutePath(table.descriptor.baseFilename()); return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath); } + public static Collection<LogRecord> make(Type type, Iterable<SSTableReader> tables) + { + // contains a mapping from sstable absolute path (everything up until the 'Data'/'Index'/etc part of the filename) to the sstable + Map<String, SSTable> absolutePaths = new HashMap<>(); + for (SSTableReader table : tables) + absolutePaths.put(absolutePath(table.descriptor.baseFilename()), table); + + // maps sstable base file name to the actual files on disk + Map<String, List<File>> existingFiles = getExistingFiles(absolutePaths.keySet()); + List<LogRecord> records = new ArrayList<>(existingFiles.size()); + for (Map.Entry<String, List<File>> entry : existingFiles.entrySet()) + { + List<File> filesOnDisk = entry.getValue(); + String baseFileName = entry.getKey(); + SSTable sstable = absolutePaths.get(baseFileName); + records.add(make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName)); + } + return records; + } + + private static String absolutePath(String baseFilename) + { + return FileUtils.getCanonicalPath(baseFilename + Component.separator); + } + public LogRecord withExistingFiles() { return make(type, getExistingFiles(), 0, absolutePath.get()); @@ -275,6 +303,41 @@ final class LogRecord return files == null ? Collections.emptyList() : Arrays.asList(files); } + /** + * absoluteFilePaths contains full file parts up to the component name + * + * this method finds all files on disk beginning with any of the paths in absoluteFilePaths + * @return a map from absoluteFilePath to actual file on disk. + */ + public static Map<String, List<File>> getExistingFiles(Set<String> absoluteFilePaths) + { + Set<File> uniqueDirectories = absoluteFilePaths.stream().map(path -> Paths.get(path).getParent().toFile()).collect(Collectors.toSet()); + Map<String, List<File>> fileMap = new HashMap<>(); + FilenameFilter ff = (dir, name) -> { + Descriptor descriptor = null; + try + { + descriptor = Descriptor.fromFilename(dir, name).left; + } + catch (Throwable t) + {// ignored - if we can't parse the filename, just skip the file + } + + String absolutePath = descriptor != null ? absolutePath(descriptor.baseFilename()) : null; + if (absolutePath != null && absoluteFilePaths.contains(absolutePath)) + fileMap.computeIfAbsent(absolutePath, k -> new ArrayList<>()).add(new File(dir, name)); + + return false; + }; + + // populate the file map: + for (File f : uniqueDirectories) + f.listFiles(ff); + + return fileMap; + } + + public boolean isFinal() { return type.isFinal(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index 350477c..6599142 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -163,6 +163,22 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran return new SSTableTidier(reader, false, this); } + Map<SSTableReader, SSTableTidier> bulkObsoletion(Iterable<SSTableReader> sstables) + { + if (!txnFile.isEmpty()) + throw new IllegalStateException("Bad state when doing bulk obsoletions"); + + txnFile.addAll(Type.REMOVE, sstables); + Map<SSTableReader, SSTableTidier> tidiers = new HashMap<>(); + for (SSTableReader sstable : sstables) + { + if (tracker != null) + tracker.notifyDeleting(sstable); + tidiers.put(sstable, new SSTableTidier(sstable, false, this)); + } + return tidiers; + } + OperationType type() { return txnFile.type(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 9feaa3e..d281278 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -245,7 +245,7 @@ public class Tracker // It is important that any method accepting/returning a Throwable never throws an exception, and does its best // to complete the instructions given to it List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>(); - accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate); + accumulate = prepareForBulkObsoletion(removed, txnLogs, obsoletions, accumulate); try { txnLogs.finish(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java index 3549523..1b8e265 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java @@ -18,14 +18,21 @@ */ package org.apache.cassandra.db.lifecycle; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.BeforeClass; import org.junit.Test; @@ -158,12 +165,23 @@ public class HelpersTest @Test public void testMarkObsolete() { + testMarkObsoleteHelper(false); + } + @Test + public void testBulkMarkObsolete() + { + testMarkObsoleteHelper(true); + } + + public void testMarkObsoleteHelper(boolean bulk) + { ColumnFamilyStore cfs = MockSchema.newCFS(); LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN); Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs)); + Iterable<SSTableReader> readersToKeep = Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs)); List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>(); - Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null)); + Assert.assertNull(bulk ? Helpers.prepareForBulkObsoletion(readers, txnLogs, obsoletions, null) : Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null)); assertNotNull(obsoletions); assertEquals(2, obsoletions.size()); @@ -172,9 +190,47 @@ public class HelpersTest for (SSTableReader reader : readers) Assert.assertTrue(reader.isMarkedCompacted()); + for (SSTableReader reader : readersToKeep) + Assert.assertFalse(reader.isMarkedCompacted()); + accumulate = Helpers.markObsolete(obsoletions, null); assertNotNull(accumulate); txnLogs.finish(); } + + @Test + public void compareBulkAndNormalObsolete() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN); + LogTransaction txnLogs2 = new LogTransaction(OperationType.UNKNOWN); + + Collection<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs)); + // add a few readers that should not be removed: + Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs)); + + List<LogTransaction.Obsoletion> normalObsoletions = new ArrayList<>(); + List<LogTransaction.Obsoletion> bulkObsoletions = new ArrayList<>(); + + Assert.assertNull(Helpers.prepareForBulkObsoletion(readers, txnLogs, normalObsoletions, null)); + Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs2, bulkObsoletions, null)); + + assertEquals(Sets.newHashSet(readers), normalObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet())); + assertEquals(Sets.newHashSet(readers), bulkObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet())); + + Set<String> normalLogRecords = new HashSet<>(); + Set<String> bulkLogRecords = new HashSet<>(); + + for (File f : txnLogs.logFiles()) + Files.lines(f.toPath()).forEach(bulkLogRecords::add); + for (File f : txnLogs2.logFiles()) + Files.lines(f.toPath()).forEach(normalLogRecords::add); + + Assert.assertEquals(readers.size(), normalLogRecords.size()); + Assert.assertEquals(bulkLogRecords, normalLogRecords); + + txnLogs.finish(); + txnLogs2.finish(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org