Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 2f268eda3 -> bed3def9a refs/heads/cassandra-3.X 5439d94c5 -> f1423806e refs/heads/trunk 9a7baa145 -> 48591489d
Revert "Make sure sstables only get committed when it's safe to discard commit log records" This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f Branch: refs/heads/cassandra-3.11 Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4 Parents: 6f90e55 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Tue Dec 6 14:06:48 2016 +0200 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Tue Dec 6 14:06:48 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../apache/cassandra/db/ColumnFamilyStore.java | 77 ++++------------- src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++++-------- .../miscellaneous/ColumnFamilyStoreTest.java | 90 -------------------- 4 files changed, 63 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5242adf..5cacdd0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,4 @@ 3.0.11 - * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868) * Nodetool should use a more sane max heap size (CASSANDRA-12739) * LocalToken ensures token values are cloned on heap (CASSANDRA-12651) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 113e10d..d2a51a9 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.SSTableTxnWriter; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -82,7 +81,6 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; import static org.apache.cassandra.utils.Throwables.maybeFail; -import static org.apache.cassandra.utils.Throwables.merge; public class ColumnFamilyStore implements ColumnFamilyStoreMBean { @@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); - @VisibleForTesting - public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), @@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { final boolean flushSecondaryIndexes; final OpOrder.Barrier writeBarrier; - final CountDownLatch memtablesFlushLatch = new CountDownLatch(1); - final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1); - volatile Throwable flushFailure = null; + final CountDownLatch latch = new CountDownLatch(1); + volatile FSWriteError flushFailure = null; final List<Memtable> memtables; private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, @@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly * with CL as we do with memtables/CFS-backed SecondaryIndexes. */ - try - { - if (flushSecondaryIndexes) - { - indexManager.flushAllNonCFSBackedIndexesBlocking(); - } - } - catch (Throwable e) - { - flushFailure = merge(flushFailure, e); - } - finally - { - secondaryIndexFlushLatch.countDown(); - } + + if (flushSecondaryIndexes) + indexManager.flushAllNonCFSBackedIndexesBlocking(); try { // we wait on the latch for the commitLogUpperBound to be set, and so that waiters // on this task can rely on all prior flushes being complete - memtablesFlushLatch.await(); + latch.await(); } catch (InterruptedException e) { @@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean metric.pendingFlushes.dec(); if (flushFailure != null) - Throwables.propagate(flushFailure); + throw flushFailure; return commitLogUpperBound; } @@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { for (Memtable memtable : memtables) - flushMemtable(memtable); + { + Collection<SSTableReader> readers = Collections.emptyList(); + if (!memtable.isClean() && !truncate) + readers = memtable.flush(); + memtable.cfs.replaceFlushed(memtable, readers); + reclaim(memtable); + } } - catch (Throwable e) + catch (FSWriteError e) { JVMStabilityInspector.inspectThrowable(e); // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. @@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // signal the post-flush we've done our work - postFlush.memtablesFlushLatch.countDown(); - } - - public Collection<SSTableReader> flushMemtable(Memtable memtable) - { - if (memtable.isClean() || truncate) - { - memtable.cfs.replaceFlushed(memtable, Collections.emptyList()); - reclaim(memtable); - return Collections.emptyList(); - } - - Collection<SSTableReader> readers = Collections.emptyList(); - try (SSTableTxnWriter writer = memtable.flush()) - { - try - { - postFlush.secondaryIndexFlushLatch.await(); - } - catch (InterruptedException e) - { - postFlush.flushFailure = merge(postFlush.flushFailure, e); - } - - if (postFlush.flushFailure == null && writer.getFilePointer() > 0) - // sstables should contain non-repaired data. - readers = writer.finish(true); - else - maybeFail(writer.abort(postFlush.flushFailure)); - } - - memtable.cfs.replaceFlushed(memtable, readers); - reclaim(memtable); - return readers; + postFlush.latch.countDown(); } private void reclaim(final Memtable memtable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 6404b37..1a7d6cb 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableTxnWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DiskAwareRunnable; import org.apache.cassandra.service.ActiveRepairService; @@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable> return partitions.get(key); } - public SSTableTxnWriter flush() + public Collection<SSTableReader> flush() { long estimatedSize = estimatedSize(); Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize); @@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable> * 1.2); // bloom filter and row index overhead } - private SSTableTxnWriter writeSortedContents(File sstableDirectory) + private Collection<SSTableReader> writeSortedContents(File sstableDirectory) { boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); logger.debug("Writing {}", Memtable.this.toString()); - SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()); - boolean trackContention = logger.isTraceEnabled(); - int heavilyContendedRowCount = 0; - // (we can't clear out the map as-we-go to free up memory, - // since the memtable is being used for queries in the "pending flush" category) - for (AtomicBTreePartition partition : partitions.values()) + Collection<SSTableReader> ssTables; + try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) { - // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2 - // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local, - // we don't need to preserve tombstones for repair. So if both operation are in this - // memtable (which will almost always be the case if there is no ongoing failure), we can - // just skip the entry (CASSANDRA-4667). - if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows()) - continue; - - if (trackContention && partition.usePessimisticLocking()) - heavilyContendedRowCount++; - - if (!partition.isEmpty()) + boolean trackContention = logger.isTraceEnabled(); + int heavilyContendedRowCount = 0; + // (we can't clear out the map as-we-go to free up memory, + // since the memtable is being used for queries in the "pending flush" category) + for (AtomicBTreePartition partition : partitions.values()) { - try (UnfilteredRowIterator iter = partition.unfilteredIterator()) + // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2 + // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local, + // we don't need to preserve tombstones for repair. So if both operation are in this + // memtable (which will almost always be the case if there is no ongoing failure), we can + // just skip the entry (CASSANDRA-4667). + if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows()) + continue; + + if (trackContention && partition.usePessimisticLocking()) + heavilyContendedRowCount++; + + if (!partition.isEmpty()) { - writer.append(iter); + try (UnfilteredRowIterator iter = partition.unfilteredIterator()) + { + writer.append(iter); + } } } - } - if (writer.getFilePointer() > 0) - logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", - writer.getFilename(), - FBUtilities.prettyPrintMemory(writer.getFilePointer()), - commitLogUpperBound)); - else - logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", - writer.getFilename(), commitLogUpperBound); + if (writer.getFilePointer() > 0) + { + logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", + writer.getFilename(), + FBUtilities.prettyPrintMemory(writer.getFilePointer()), + commitLogUpperBound)); - if (heavilyContendedRowCount > 0) - logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); + // sstables should contain non-repaired data. + ssTables = writer.finish(true); + } + else + { + logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", + writer.getFilename(), commitLogUpperBound); + writer.abort(); + ssTables = Collections.emptyList(); + } - return writer; + if (heavilyContendedRowCount > 0) + logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); + + return ssTables; + } } @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java deleted file mode 100644 index 1285392..0000000 --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.cql3.validation.miscellaneous; - -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import org.junit.Test; - -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.index.StubIndex; -import org.apache.cassandra.schema.IndexMetadata; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class ColumnFamilyStoreTest extends CQLTester -{ - @Test - public void testFailing2iFlush() throws Throwable - { - createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)"); - createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'"); - - for (int i = 0; i < 10; i++) - execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i); - - try - { - getCurrentColumnFamilyStore().forceBlockingFlush(); - } - catch (Throwable t) - { - // ignore - } - - // Make sure there's no flush running - waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0, - TimeUnit.SECONDS.toMillis(5)); - - // SSTables remain uncommitted. - assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length); - } - - public void waitFor(Supplier<Boolean> condition, long timeout) - { - long start = System.currentTimeMillis(); - while(true) - { - if (condition.get()) - return; - - assertTrue("Timeout ocurred while waiting for condition", - System.currentTimeMillis() - start < timeout); - } - } - - // Used for index creation above - public static class BrokenCustom2I extends StubIndex - { - public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata) - { - super(baseCfs, metadata); - } - - public Callable<?> getBlockingFlushTask() - { - throw new RuntimeException(); - } - } -}