Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 2f268eda3 -> bed3def9a
  refs/heads/cassandra-3.X 5439d94c5 -> f1423806e
  refs/heads/trunk 9a7baa145 -> 48591489d


Revert "Make sure sstables only get committed when it's safe to discard commit 
log records"

This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a 
wrong version of the patch.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f

Branch: refs/heads/cassandra-3.11
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 ++++-------------
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++++--------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 --------------------
 4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
 3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log 
records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs 
(CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    @VisibleForTesting
-    public static final ExecutorService flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    private static final ExecutorService flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                
           StageManager.KEEPALIVE,
                                                                                
           TimeUnit.SECONDS,
                                                                                
           new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
-        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
-        volatile Throwable flushFailure = null;
+        final CountDownLatch latch = new CountDownLatch(1);
+        volatile FSWriteError flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom 
implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-            try
-            {
-                if (flushSecondaryIndexes)
-                {
-                    indexManager.flushAllNonCFSBackedIndexesBlocking();
-                }
-            }
-            catch (Throwable e)
-            {
-                flushFailure = merge(flushFailure, e);
-            }
-            finally
-            {
-                secondaryIndexFlushLatch.countDown();
-            }
+
+            if (flushSecondaryIndexes)
+                indexManager.flushAllNonCFSBackedIndexesBlocking();
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, 
and so that waiters
                 // on this task can rely on all prior flushes being complete
-                memtablesFlushLatch.await();
+                latch.await();
             }
             catch (InterruptedException e)
             {
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                Throwables.propagate(flushFailure);
+                throw flushFailure;
 
             return commitLogUpperBound;
         }
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                    flushMemtable(memtable);
+                {
+                    Collection<SSTableReader> readers = 
Collections.emptyList();
+                    if (!memtable.isClean() && !truncate)
+                        readers = memtable.flush();
+                    memtable.cfs.replaceFlushed(memtable, readers);
+                    reclaim(memtable);
+                }
             }
-            catch (Throwable e)
+            catch (FSWriteError e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow 
CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.memtablesFlushLatch.countDown();
-        }
-
-        public Collection<SSTableReader> flushMemtable(Memtable memtable)
-        {
-            if (memtable.isClean() || truncate)
-            {
-                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
-                reclaim(memtable);
-                return Collections.emptyList();
-            }
-
-            Collection<SSTableReader> readers = Collections.emptyList();
-            try (SSTableTxnWriter writer = memtable.flush())
-            {
-                try
-                {
-                    postFlush.secondaryIndexFlushLatch.await();
-                }
-                catch (InterruptedException e)
-                {
-                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
-                }
-
-                if (postFlush.flushFailure == null && writer.getFilePointer() 
> 0)
-                    // sstables should contain non-repaired data.
-                    readers = writer.finish(true);
-                else
-                    maybeFail(writer.abort(postFlush.flushFailure));
-            }
-
-            memtable.cfs.replaceFlushed(memtable, readers);
-            reclaim(memtable);
-            return readers;
+            postFlush.latch.countDown();
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import 
org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public SSTableTxnWriter flush()
+    public Collection<SSTableReader> flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = 
cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+    private Collection<SSTableReader> writeSortedContents(File 
sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && 
cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        SSTableTxnWriter writer = 
createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), 
statsCollector.get());
-        boolean trackContention = logger.isTraceEnabled();
-        int heavilyContendedRowCount = 0;
-        // (we can't clear out the map as-we-go to free up memory,
-        //  since the memtable is being used for queries in the "pending 
flush" category)
-        for (AtomicBTreePartition partition : partitions.values())
+        Collection<SSTableReader> ssTables;
+        try (SSTableTxnWriter writer = 
createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), 
statsCollector.get()))
         {
-            // Each batchlog partition is a separate entry in the log. And for 
an entry, we only do 2
-            // operations: 1) we insert the entry and 2) we delete it. 
Further, BL data is strictly local,
-            // we don't need to preserve tombstones for repair. So if both 
operation are in this
-            // memtable (which will almost always be the case if there is no 
ongoing failure), we can
-            // just skip the entry (CASSANDRA-4667).
-            if (isBatchLogTable && 
!partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                continue;
-
-            if (trackContention && partition.usePessimisticLocking())
-                heavilyContendedRowCount++;
-
-            if (!partition.isEmpty())
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending 
flush" category)
+            for (AtomicBTreePartition partition : partitions.values())
             {
-                try (UnfilteredRowIterator iter = 
partition.unfilteredIterator())
+                // Each batchlog partition is a separate entry in the log. And 
for an entry, we only do 2
+                // operations: 1) we insert the entry and 2) we delete it. 
Further, BL data is strictly local,
+                // we don't need to preserve tombstones for repair. So if both 
operation are in this
+                // memtable (which will almost always be the case if there is 
no ongoing failure), we can
+                // just skip the entry (CASSANDRA-4667).
+                if (isBatchLogTable && 
!partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                    continue;
+
+                if (trackContention && partition.usePessimisticLocking())
+                    heavilyContendedRowCount++;
+
+                if (!partition.isEmpty())
                 {
-                    writer.append(iter);
+                    try (UnfilteredRowIterator iter = 
partition.unfilteredIterator())
+                    {
+                        writer.append(iter);
+                    }
                 }
             }
-        }
 
-        if (writer.getFilePointer() > 0)
-            logger.debug(String.format("Completed flushing %s (%s) for 
commitlog position %s",
-                                       writer.getFilename(),
-                                       
FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                       commitLogUpperBound));
-        else
-            logger.debug("Completed flushing {}; nothing needed to be 
retained.  Commitlog position was {}",
-                         writer.getFilename(), commitLogUpperBound);
+            if (writer.getFilePointer() > 0)
+            {
+                logger.debug(String.format("Completed flushing %s (%s) for 
commitlog position %s",
+                                           writer.getFilename(),
+                                           
FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                           commitLogUpperBound));
 
-        if (heavilyContendedRowCount > 0)
-            logger.trace(String.format("High update contention in %d/%d 
partitions of %s ", heavilyContendedRowCount, partitions.size(), 
Memtable.this.toString()));
+                // sstables should contain non-repaired data.
+                ssTables = writer.finish(true);
+            }
+            else
+            {
+                logger.debug("Completed flushing {}; nothing needed to be 
retained.  Commitlog position was {}",
+                             writer.getFilename(), commitLogUpperBound);
+                writer.abort();
+                ssTables = Collections.emptyList();
+            }
 
-        return writer;
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d 
partitions of %s ", heavilyContendedRowCount, partitions.size(), 
Memtable.this.toString()));
+
+            return ssTables;
+        }
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- 
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
-    @Test
-    public void testFailing2iFlush() throws Throwable
-    {
-        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
-        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 
'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
-        for (int i = 0; i < 10; i++)
-            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
-        try
-        {
-            getCurrentColumnFamilyStore().forceBlockingFlush();
-        }
-        catch (Throwable t)
-        {
-            // ignore
-        }
-
-        // Make sure there's no flush running
-        waitFor(() -> ((JMXEnabledThreadPoolExecutor) 
ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
-                TimeUnit.SECONDS.toMillis(5));
-
-        // SSTables remain uncommitted.
-        assertEquals(1, 
getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
-    }
-
-    public void waitFor(Supplier<Boolean> condition, long timeout)
-    {
-        long start = System.currentTimeMillis();
-        while(true)
-        {
-            if (condition.get())
-                return;
-
-            assertTrue("Timeout ocurred while waiting for condition",
-                       System.currentTimeMillis() - start < timeout);
-        }
-    }
-
-    // Used for index creation above
-    public static class BrokenCustom2I extends StubIndex
-    {
-        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata 
metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public Callable<?> getBlockingFlushTask()
-        {
-            throw new RuntimeException();
-        }
-    }
-}

Reply via email to