Repository: cassandra Updated Branches: refs/heads/trunk 2240455f0 -> c64ac4188
Remove tmplink files for offline compactions Patch by marcuse; reviewed by jmckenzie for CASSANDRA-8321 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29259cb2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29259cb2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29259cb2 Branch: refs/heads/trunk Commit: 29259cb22c2ba02d5c2beba6c6512173f8b5b3f9 Parents: d69728f Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Nov 25 11:12:20 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Dec 10 14:46:44 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableRewriter.java | 31 +++++-- .../io/sstable/SSTableRewriterTest.java | 91 +++++++++++--------- 3 files changed, 79 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3545afc..2e74a15 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Remove tmplink files for offline compactions (CASSANDRA-8321) * Reduce maxHintsInProgress (CASSANDRA-8415) * BTree updates may call provided update function twice (CASSANDRA-8018) * Release sstable references after anticompaction (CASSANDRA-8386) http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index d187e9d..f9d2fe4 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -190,9 +190,15 @@ public class SSTableRewriter for (Pair<SSTableWriter, SSTableReader> w : finishedWriters) { - // we should close the bloom filter if we have not opened an sstable reader from this - // writer (it will get closed when we release the sstable reference below): + // we should close the bloom filter if we have not opened an sstable reader from this + // writer (it will get closed when we release the sstable reference below): w.left.abort(w.right == null); + if (isOffline && w.right != null) + { + // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here: + w.right.markObsolete(); + w.right.releaseReference(); + } } // also remove already completed SSTables @@ -344,7 +350,15 @@ public class SSTableRewriter finished.add(newReader); if (w.right != null) + { w.right.sharesBfWith(newReader); + if (isOffline) + { + // remove the tmplink files if we are offline - no one is using them + w.right.markObsolete(); + w.right.releaseReference(); + } + } // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. toReplace.add(Pair.create(w.right, newReader)); } @@ -356,11 +370,10 @@ public class SSTableRewriter it.remove(); } - for (Pair<SSTableReader, SSTableReader> replace : toReplace) - replaceEarlyOpenedFile(replace.left, replace.right); - if (!isOffline) { + for (Pair<SSTableReader, SSTableReader> replace : toReplace) + replaceEarlyOpenedFile(replace.left, replace.right); dataTracker.unmarkCompacting(finished); } return finished; @@ -382,8 +395,16 @@ public class SSTableRewriter { SSTableReader newReader = w.left.closeAndOpenReader(maxAge); finished.add(newReader); + if (w.right != null) + { w.right.sharesBfWith(newReader); + if (isOffline) + { + w.right.markObsolete(); + w.right.releaseReference(); + } + } // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. toReplace.add(Pair.create(w.right, newReader)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 0a76b66..c0a017e 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -43,6 +43,7 @@ import org.apache.cassandra.db.compaction.CompactionController; import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.compaction.SSTableSplitter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; @@ -457,61 +458,59 @@ public class SSTableRewriterTest extends SchemaLoader validateCFS(cfs); } @Test - public void testAbort() throws Exception + public void testSSTableSplit() throws InterruptedException { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); cfs.truncateBlocking(); + cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 1000); - cfs.addSSTable(s); - Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - SSTableWriter w = getWriter(cfs, s.descriptor.directory); - rewriter.switchWriter(w); - try (ICompactionScanner scanner = compacting.iterator().next().getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + cfs.getDataTracker().markCompacting(Arrays.asList(s)); + SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10); + splitter.split(); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + for (File f : s.descriptor.directory.listFiles()) { - while (scanner.hasNext()) - { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) - { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - } - } - try - { - rewriter.finishAndThrow(false); - } - catch (Throwable t) - { - rewriter.abort(); - } + // we need to clear out the data dir, otherwise tests running after this breaks + f.delete(); } - Thread.sleep(1000); - int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); - assertEquals(filecount, 1); - assertEquals(1, cfs.getSSTables().size()); - validateCFS(cfs); - cfs.truncateBlocking(); - Thread.sleep(1000); - filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); - assertEquals(0, filecount); + } + @Test + public void testOfflineAbort() throws Exception + { + testAbortHelper(true, true); + } + @Test + public void testOfflineAbort2() throws Exception + { + testAbortHelper(false, true); + } + + @Test + public void testAbort() throws Exception + { + testAbortHelper(false, false); } @Test public void testAbort2() throws Exception { + testAbortHelper(true, false); + } + + private void testAbortHelper(boolean earlyException, boolean offline) throws Exception + { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); cfs.truncateBlocking(); SSTableReader s = writeFile(cfs, 1000); - cfs.addSSTable(s); + if (!offline) + cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline); SSTableWriter w = getWriter(cfs, s.descriptor.directory); rewriter.switchWriter(w); try (ICompactionScanner scanner = compacting.iterator().next().getScanner(); @@ -527,7 +526,7 @@ public class SSTableRewriterTest extends SchemaLoader } try { - rewriter.finishAndThrow(true); + rewriter.finishAndThrow(earlyException); } catch (Throwable t) { @@ -537,11 +536,25 @@ public class SSTableRewriterTest extends SchemaLoader Thread.sleep(1000); int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); assertEquals(filecount, 1); - assertEquals(1, cfs.getSSTables().size()); - validateCFS(cfs); + if (!offline) + { + assertEquals(1, cfs.getSSTables().size()); + validateCFS(cfs); + } cfs.truncateBlocking(); Thread.sleep(1000); filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + if (offline) + { + // the file is not added to the CFS, therefor not truncated away above + assertEquals(1, filecount); + for (File f : s.descriptor.directory.listFiles()) + { + f.delete(); + } + filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + } + assertEquals(0, filecount); }