Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 5f64ed7cc -> 6f90e55e7
  refs/heads/cassandra-3.11 b207f2e3b -> 2f268eda3
  refs/heads/cassandra-3.X 838a21d40 -> 5439d94c5
  refs/heads/trunk 8ddbb7493 -> 9a7baa145


Make sure sstables only get committed when it's safe to discard commit log 
records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e

Branch: refs/heads/cassandra-3.0
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 +++++++++++++----
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++----------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log 
records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs 
(CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    @VisibleForTesting
+    public static final ExecutorService flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                
           StageManager.KEEPALIVE,
                                                                                
           TimeUnit.SECONDS,
                                                                                
           new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom 
implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
+            try
+            {
+                if (flushSecondaryIndexes)
+                {
+                    indexManager.flushAllNonCFSBackedIndexesBlocking();
+                }
+            }
+            catch (Throwable e)
+            {
+                flushFailure = merge(flushFailure, e);
+            }
+            finally
+            {
+                secondaryIndexFlushLatch.countDown();
+            }
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, 
and so that waiters
                 // on this task can rely on all prior flushes being complete
-                latch.await();
+                memtablesFlushLatch.await();
             }
             catch (InterruptedException e)
             {
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                {
-                    Collection<SSTableReader> readers = 
Collections.emptyList();
-                    if (!memtable.isClean() && !truncate)
-                        readers = memtable.flush();
-                    memtable.cfs.replaceFlushed(memtable, readers);
-                    reclaim(memtable);
-                }
+                    flushMemtable(memtable);
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow 
CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            postFlush.memtablesFlushLatch.countDown();
+        }
+
+        public Collection<SSTableReader> flushMemtable(Memtable memtable)
+        {
+            if (memtable.isClean() || truncate)
+            {
+                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+                reclaim(memtable);
+                return Collections.emptyList();
+            }
+
+            Collection<SSTableReader> readers = Collections.emptyList();
+            try (SSTableTxnWriter writer = memtable.flush())
+            {
+                try
+                {
+                    postFlush.secondaryIndexFlushLatch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
+                }
+
+                if (postFlush.flushFailure == null && writer.getFilePointer() 
> 0)
+                    // sstables should contain non-repaired data.
+                    readers = writer.finish(true);
+                else
+                    maybeFail(writer.abort(postFlush.flushFailure));
+            }
+
+            memtable.cfs.replaceFlushed(memtable, readers);
+            reclaim(memtable);
+            return readers;
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import 
org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public Collection<SSTableReader> flush()
+    public SSTableTxnWriter flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = 
cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private Collection<SSTableReader> writeSortedContents(File 
sstableDirectory)
+    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && 
cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        Collection<SSTableReader> ssTables;
-        try (SSTableTxnWriter writer = 
createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), 
statsCollector.get()))
+        SSTableTxnWriter writer = 
createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), 
statsCollector.get());
+        boolean trackContention = logger.isTraceEnabled();
+        int heavilyContendedRowCount = 0;
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending 
flush" category)
+        for (AtomicBTreePartition partition : partitions.values())
         {
-            boolean trackContention = logger.isTraceEnabled();
-            int heavilyContendedRowCount = 0;
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending 
flush" category)
-            for (AtomicBTreePartition partition : partitions.values())
+            // Each batchlog partition is a separate entry in the log. And for 
an entry, we only do 2
+            // operations: 1) we insert the entry and 2) we delete it. 
Further, BL data is strictly local,
+            // we don't need to preserve tombstones for repair. So if both 
operation are in this
+            // memtable (which will almost always be the case if there is no 
ongoing failure), we can
+            // just skip the entry (CASSANDRA-4667).
+            if (isBatchLogTable && 
!partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                continue;
+
+            if (trackContention && partition.usePessimisticLocking())
+                heavilyContendedRowCount++;
+
+            if (!partition.isEmpty())
             {
-                // Each batchlog partition is a separate entry in the log. And 
for an entry, we only do 2
-                // operations: 1) we insert the entry and 2) we delete it. 
Further, BL data is strictly local,
-                // we don't need to preserve tombstones for repair. So if both 
operation are in this
-                // memtable (which will almost always be the case if there is 
no ongoing failure), we can
-                // just skip the entry (CASSANDRA-4667).
-                if (isBatchLogTable && 
!partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                    continue;
-
-                if (trackContention && partition.usePessimisticLocking())
-                    heavilyContendedRowCount++;
-
-                if (!partition.isEmpty())
+                try (UnfilteredRowIterator iter = 
partition.unfilteredIterator())
                 {
-                    try (UnfilteredRowIterator iter = 
partition.unfilteredIterator())
-                    {
-                        writer.append(iter);
-                    }
+                    writer.append(iter);
                 }
             }
+        }
 
-            if (writer.getFilePointer() > 0)
-            {
-                logger.debug(String.format("Completed flushing %s (%s) for 
commitlog position %s",
-                                           writer.getFilename(),
-                                           
FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                           commitLogUpperBound));
-
-                // sstables should contain non-repaired data.
-                ssTables = writer.finish(true);
-            }
-            else
-            {
-                logger.debug("Completed flushing {}; nothing needed to be 
retained.  Commitlog position was {}",
-                             writer.getFilename(), commitLogUpperBound);
-                writer.abort();
-                ssTables = Collections.emptyList();
-            }
+        if (writer.getFilePointer() > 0)
+            logger.debug(String.format("Completed flushing %s (%s) for 
commitlog position %s",
+                                       writer.getFilename(),
+                                       
FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                       commitLogUpperBound));
+        else
+            logger.debug("Completed flushing {}; nothing needed to be 
retained.  Commitlog position was {}",
+                         writer.getFilename(), commitLogUpperBound);
 
-            if (heavilyContendedRowCount > 0)
-                logger.trace(String.format("High update contention in %d/%d 
partitions of %s ", heavilyContendedRowCount, partitions.size(), 
Memtable.this.toString()));
+        if (heavilyContendedRowCount > 0)
+            logger.trace(String.format("High update contention in %d/%d 
partitions of %s ", heavilyContendedRowCount, partitions.size(), 
Memtable.this.toString()));
 
-            return ssTables;
-        }
+        return writer;
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 
'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+
+        // Make sure there's no flush running
+        waitFor(() -> ((JMXEnabledThreadPoolExecutor) 
ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+                TimeUnit.SECONDS.toMillis(5));
+
+        // SSTables remain uncommitted.
+        assertEquals(1, 
getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    public void waitFor(Supplier<Boolean> condition, long timeout)
+    {
+        long start = System.currentTimeMillis();
+        while(true)
+        {
+            if (condition.get())
+                return;
+
+            assertTrue("Timeout ocurred while waiting for condition",
+                       System.currentTimeMillis() - start < timeout);
+        }
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata 
metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException();
+        }
+    }
+}

Reply via email to