Make sure sstables only get committed when it's safe to discard commit log 
records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315

Branch: refs/heads/cassandra-3.11
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 45 +++++++++-----------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
 3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log 
records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs 
(CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      */
     private final class PostFlush implements Callable<ReplayPosition>
     {
-        final boolean flushSecondaryIndexes;
-        final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier,
-                          List<Memtable> memtables)
+        private PostFlush(List<Memtable> memtables)
         {
-            this.writeBarrier = writeBarrier;
-            this.flushSecondaryIndexes = flushSecondaryIndexes;
             this.memtables = memtables;
         }
 
         public ReplayPosition call()
         {
-            writeBarrier.await();
-
-            /**
-             * we can flush 2is as soon as the barrier completes, as they will 
be consistent with (or ahead of) the
-             * flushed memtables and CL position, which is as good as we can 
guarantee.
-             * TODO: SecondaryIndex should support setBarrier(), so custom 
implementations can co-ordinate exactly
-             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-             */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
-
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, 
and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             // since this happens after wiring up the commitLogUpperBound, we 
also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are 
done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+            postFlush = new PostFlush(memtables);
         }
 
         public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             try
             {
+                boolean flushNonCf2i = true;
                 for (Memtable memtable : memtables)
                 {
                     Collection<SSTableReader> readers = 
Collections.emptyList();
                     if (!memtable.isClean() && !truncate)
+                    {
+                        // TODO: SecondaryIndex should support setBarrier(), 
so custom implementations can co-ordinate exactly
+                        // with CL as we do with memtables/CFS-backed 
SecondaryIndexes.
+                        if (flushNonCf2i)
+                        {
+                            indexManager.flushAllNonCFSBackedIndexesBlocking();
+                            flushNonCf2i = false;
+                        }
                         readers = memtable.flush();
+                    }
                     memtable.cfs.replaceFlushed(memtable, readers);
                     reclaim(memtable);
                 }
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow 
CommitLog to be discarded.
                 postFlush.flushFailure = e;
             }
-
-            // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            finally
+            {
+                // signal the post-flush we've done our work
+                postFlush.latch.countDown();
+            }
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java 
b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
         assertEquals("bar", 
IndexWithOverloadedValidateOptions.options.get("foo"));
     }
 
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 
'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+            fail("Flush should have thrown an exception.");
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t.getMessage().contains("Broken2I"));
+        }
+
+        // SSTables remain uncommitted.
+        assertEquals(1, 
getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata 
metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException("Broken2I");
+        }
+    }
+
     private void testCreateIndex(String indexName, String... 
targetColumnNames) throws Throwable
     {
         createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING 
'%s'",

Reply via email to