Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315 Branch: refs/heads/cassandra-3.X Commit: 0ecef31548c287ac2d9f818413457bc947362733 Parents: d2ba715 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Tue Nov 29 22:58:36 2016 +0100 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Tue Dec 6 14:10:00 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 45 +++++++++----------- .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++ 3 files changed, 58 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5cacdd0..5242adf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868) * Nodetool should use a more sane max heap size (CASSANDRA-12739) * LocalToken ensures token values are cloned on heap (CASSANDRA-12651) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index d2a51a9..71e1653 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ private final class PostFlush implements Callable<ReplayPosition> { - final boolean flushSecondaryIndexes; - final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); - volatile FSWriteError flushFailure = null; + volatile Throwable flushFailure = null; final List<Memtable> memtables; - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, - List<Memtable> memtables) + private PostFlush(List<Memtable> memtables) { - this.writeBarrier = writeBarrier; - this.flushSecondaryIndexes = flushSecondaryIndexes; this.memtables = memtables; } public ReplayPosition call() { - writeBarrier.await(); - - /** - * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the - * flushed memtables and CL position, which is as good as we can guarantee. - * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly - * with CL as we do with memtables/CFS-backed SecondaryIndexes. - */ - - if (flushSecondaryIndexes) - indexManager.flushAllNonCFSBackedIndexesBlocking(); - try { // we wait on the latch for the commitLogUpperBound to be set, and so that waiters @@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean metric.pendingFlushes.dec(); if (flushFailure != null) - throw flushFailure; + Throwables.propagate(flushFailure); return commitLogUpperBound; } @@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier // replay positions have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); - postFlush = new PostFlush(!truncate, writeBarrier, memtables); + postFlush = new PostFlush(memtables); } public void run() @@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { + boolean flushNonCf2i = true; for (Memtable memtable : memtables) { Collection<SSTableReader> readers = Collections.emptyList(); if (!memtable.isClean() && !truncate) + { + // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly + // with CL as we do with memtables/CFS-backed SecondaryIndexes. + if (flushNonCf2i) + { + indexManager.flushAllNonCFSBackedIndexesBlocking(); + flushNonCf2i = false; + } readers = memtable.flush(); + } memtable.cfs.replaceFlushed(memtable, readers); reclaim(memtable); } } - catch (FSWriteError e) + catch (Throwable e) { JVMStabilityInspector.inspectThrowable(e); // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. postFlush.flushFailure = e; } - - // signal the post-flush we've done our work - postFlush.latch.countDown(); + finally + { + // signal the post-flush we've done our work + postFlush.latch.countDown(); + } } private void reclaim(final Memtable memtable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index b8e4185..6930d13 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo")); } + @Test + public void testFailing2iFlush() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)"); + createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i); + + try + { + getCurrentColumnFamilyStore().forceBlockingFlush(); + fail("Flush should have thrown an exception."); + } + catch (Throwable t) + { + assertTrue(t.getMessage().contains("Broken2I")); + } + + // SSTables remain uncommitted. + assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length); + } + + // Used for index creation above + public static class BrokenCustom2I extends StubIndex + { + public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public Callable<?> getBlockingFlushTask() + { + throw new RuntimeException("Broken2I"); + } + } + private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable { createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",