Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 f39e28dc7 -> d73f45bad refs/heads/cassandra-3.11 a798257ce -> abd9be1e4 refs/heads/trunk 32110d6d2 -> 720772033
Pre-create deletion log records to finish compactions quicker Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-12763 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d73f45ba Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d73f45ba Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d73f45ba Branch: refs/heads/cassandra-3.0 Commit: d73f45bad4cd6d8cf1cea7d9b35b76075dc277e1 Parents: f39e28d Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Dec 11 15:11:20 2017 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Feb 19 07:52:20 2018 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/lifecycle/Helpers.java | 19 ++----- .../apache/cassandra/db/lifecycle/LogFile.java | 37 +++++++++++-- .../cassandra/db/lifecycle/LogRecord.java | 11 ++-- .../cassandra/db/lifecycle/LogTransaction.java | 33 ++++++------ .../apache/cassandra/db/lifecycle/Tracker.java | 2 +- .../cassandra/db/lifecycle/HelpersTest.java | 56 +++++--------------- 7 files changed, 74 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 14a62a4..8cf665e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.17 + * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763) Merged from 2.1: * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183) Merged from 2.2: http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/Helpers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java index b9adc4b..8e0d514 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java @@ -22,6 +22,7 @@ import java.util.*; import com.google.common.base.Predicate; import com.google.common.collect.*; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Throwables; @@ -127,11 +128,12 @@ class Helpers static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) { + Map<SSTable, LogRecord> logRecords = txnLogs.makeRemoveRecords(readers); for (SSTableReader reader : readers) { try { - obsoletions.add(new LogTransaction.Obsoletion(reader, txnLogs.obsoleted(reader))); + obsoletions.add(new LogTransaction.Obsoletion(reader, txnLogs.obsoleted(reader, logRecords.get(reader)))); } catch (Throwable t) { @@ -141,21 +143,6 @@ class Helpers return accumulate; } - static Throwable prepareForBulkObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) - { - try - { - for (Map.Entry<SSTableReader, LogTransaction.SSTableTidier> entry : txnLogs.bulkObsoletion(readers).entrySet()) - obsoletions.add(new LogTransaction.Obsoletion(entry.getKey(), entry.getValue())); - } - catch (Throwable t) - { - accumulate = Throwables.merge(accumulate, t); - } - - return accumulate; - } - static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate) { if (obsoletions == null || obsoletions.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index be26163..8425a6d 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -281,18 +281,23 @@ final class LogFile implements AutoCloseable void add(Type type, SSTable table) { - if (!addRecord(makeRecord(type, table))) + add(makeRecord(type, table)); + } + + void add(LogRecord record) + { + if (!addRecord(record)) throw new IllegalStateException(); } public void addAll(Type type, Iterable<SSTableReader> toBulkAdd) { - for (LogRecord record : makeRecords(type, toBulkAdd)) + for (LogRecord record : makeRecords(type, toBulkAdd).values()) if (!addRecord(record)) throw new IllegalStateException(); } - private Collection<LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables) + Map<SSTable, LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables) { assert type == Type.ADD || type == Type.REMOVE; @@ -313,6 +318,20 @@ final class LogFile implements AutoCloseable return LogRecord.make(type, table); } + /** + * this version of makeRecord takes an existing LogRecord and converts it to a + * record with the given type. This avoids listing the directory and if the + * LogRecord already exists, we have all components for the sstable + */ + private LogRecord makeRecord(Type type, SSTable table, LogRecord record) + { + assert type == Type.ADD || type == Type.REMOVE; + + File folder = table.descriptor.directory; + replicas.maybeCreateReplica(folder, getFileName(folder), records); + return record.asType(type); + } + private boolean addRecord(LogRecord record) { if (records.contains(record)) @@ -334,7 +353,17 @@ final class LogFile implements AutoCloseable boolean contains(Type type, SSTable table) { - return records.contains(makeRecord(type, table)); + return contains(makeRecord(type, table)); + } + + boolean contains(Type type, SSTable sstable, LogRecord record) + { + return contains(makeRecord(type, sstable, record)); + } + + private boolean contains(LogRecord record) + { + return records.contains(record); } void deleteFilesForRecordsOfType(Type type) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java index a322ea1..1dc17f6 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@ -158,7 +158,7 @@ final class LogRecord return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath); } - public static Collection<LogRecord> make(Type type, Iterable<SSTableReader> tables) + public static Map<SSTable, LogRecord> make(Type type, Iterable<SSTableReader> tables) { // contains a mapping from sstable absolute path (everything up until the 'Data'/'Index'/etc part of the filename) to the sstable Map<String, SSTable> absolutePaths = new HashMap<>(); @@ -167,13 +167,13 @@ final class LogRecord // maps sstable base file name to the actual files on disk Map<String, List<File>> existingFiles = getExistingFiles(absolutePaths.keySet()); - List<LogRecord> records = new ArrayList<>(existingFiles.size()); + Map<SSTable, LogRecord> records = new HashMap<>(existingFiles.size()); for (Map.Entry<String, List<File>> entry : existingFiles.entrySet()) { List<File> filesOnDisk = entry.getValue(); String baseFileName = entry.getKey(); SSTable sstable = absolutePaths.get(baseFileName); - records.add(make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName)); + records.put(sstable, make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName)); } return records; } @@ -415,4 +415,9 @@ final class LogRecord FBUtilities.updateChecksumInt(crc32, numFiles); return crc32.getValue() & (Long.MAX_VALUE); } + + LogRecord asType(Type type) + { + return new LogRecord(type, absolutePath.orElse(null), updateTime, numFiles); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index 6599142..a10bcd2 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -143,19 +143,28 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran } /** + * helper method for tests, creates the remove records per sstable + */ + @VisibleForTesting + SSTableTidier obsoleted(SSTableReader sstable) + { + return obsoleted(sstable, LogRecord.make(Type.REMOVE, sstable)); + } + + /** * Schedule a reader for deletion as soon as it is fully unreferenced. */ - SSTableTidier obsoleted(SSTableReader reader) + SSTableTidier obsoleted(SSTableReader reader, LogRecord logRecord) { - if (txnFile.contains(Type.ADD, reader)) + if (txnFile.contains(Type.ADD, reader, logRecord)) { - if (txnFile.contains(Type.REMOVE, reader)) + if (txnFile.contains(Type.REMOVE, reader, logRecord)) throw new IllegalArgumentException(); return new SSTableTidier(reader, true, this); } - txnFile.add(Type.REMOVE, reader); + txnFile.add(logRecord); if (tracker != null) tracker.notifyDeleting(reader); @@ -163,22 +172,12 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran return new SSTableTidier(reader, false, this); } - Map<SSTableReader, SSTableTidier> bulkObsoletion(Iterable<SSTableReader> sstables) + Map<SSTable, LogRecord> makeRemoveRecords(Iterable<SSTableReader> sstables) { - if (!txnFile.isEmpty()) - throw new IllegalStateException("Bad state when doing bulk obsoletions"); - - txnFile.addAll(Type.REMOVE, sstables); - Map<SSTableReader, SSTableTidier> tidiers = new HashMap<>(); - for (SSTableReader sstable : sstables) - { - if (tracker != null) - tracker.notifyDeleting(sstable); - tidiers.put(sstable, new SSTableTidier(sstable, false, this)); - } - return tidiers; + return txnFile.makeRecords(Type.REMOVE, sstables); } + OperationType type() { return txnFile.type(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index d281278..9feaa3e 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -245,7 +245,7 @@ public class Tracker // It is important that any method accepting/returning a Throwable never throws an exception, and does its best // to complete the instructions given to it List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>(); - accumulate = prepareForBulkObsoletion(removed, txnLogs, obsoletions, accumulate); + accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate); try { txnLogs.finish(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d73f45ba/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java index 1b8e265..1d9f8aa 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java @@ -18,21 +18,14 @@ */ package org.apache.cassandra.db.lifecycle; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.junit.BeforeClass; import org.junit.Test; @@ -45,6 +38,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class HelpersTest { @@ -165,23 +159,13 @@ public class HelpersTest @Test public void testMarkObsolete() { - testMarkObsoleteHelper(false); - } - @Test - public void testBulkMarkObsolete() - { - testMarkObsoleteHelper(true); - } - - public void testMarkObsoleteHelper(boolean bulk) - { ColumnFamilyStore cfs = MockSchema.newCFS(); LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN); Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs)); Iterable<SSTableReader> readersToKeep = Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs)); List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>(); - Assert.assertNull(bulk ? Helpers.prepareForBulkObsoletion(readers, txnLogs, obsoletions, null) : Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null)); + Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null); assertNotNull(obsoletions); assertEquals(2, obsoletions.size()); @@ -200,37 +184,21 @@ public class HelpersTest } @Test - public void compareBulkAndNormalObsolete() throws IOException + public void testObsoletionPerformance() { ColumnFamilyStore cfs = MockSchema.newCFS(); LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN); - LogTransaction txnLogs2 = new LogTransaction(OperationType.UNKNOWN); - - Collection<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs)); - // add a few readers that should not be removed: - Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs)); + List<SSTableReader> readers = new ArrayList<>(); - List<LogTransaction.Obsoletion> normalObsoletions = new ArrayList<>(); - List<LogTransaction.Obsoletion> bulkObsoletions = new ArrayList<>(); - - Assert.assertNull(Helpers.prepareForBulkObsoletion(readers, txnLogs, normalObsoletions, null)); - Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs2, bulkObsoletions, null)); - - assertEquals(Sets.newHashSet(readers), normalObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet())); - assertEquals(Sets.newHashSet(readers), bulkObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet())); - - Set<String> normalLogRecords = new HashSet<>(); - Set<String> bulkLogRecords = new HashSet<>(); - - for (File f : txnLogs.logFiles()) - Files.lines(f.toPath()).forEach(bulkLogRecords::add); - for (File f : txnLogs2.logFiles()) - Files.lines(f.toPath()).forEach(normalLogRecords::add); - - Assert.assertEquals(readers.size(), normalLogRecords.size()); - Assert.assertEquals(bulkLogRecords, normalLogRecords); + for (int i = 0; i < 10000; i++) + { + readers.add(MockSchema.sstable(i + 1, cfs)); + } + long start = System.currentTimeMillis(); + Helpers.prepareForObsoletion(readers.subList(0, 500), txnLogs, new ArrayList<>(),null ); txnLogs.finish(); - txnLogs2.finish(); + long time = System.currentTimeMillis() - start; + assertTrue(time < 20000); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org