Fix potential infinite loop when reloading CFS patch by slebresne; reviewed by jbellis for CASSANDRA-5064
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b217e46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b217e46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b217e46 Branch: refs/heads/trunk Commit: 9b217e4676187cd6b4f05b6724c199dd15e03f73 Parents: ab071c5 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Dec 17 11:29:15 2012 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon Dec 17 11:29:15 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 99 +++++---------- src/java/org/apache/cassandra/db/DefsTable.java | 5 +- src/java/org/apache/cassandra/db/Memtable.java | 12 -- src/java/org/apache/cassandra/db/Table.java | 6 +- .../apache/cassandra/service/StorageService.java | 6 +- .../org/apache/cassandra/streaming/StreamOut.java | 6 +- 7 files changed, 36 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9a9df1b..327d427 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ -1.2.0 +1.2.0-rc2 * cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060) + * Fix potential infinite loop when reloading CFS (CASSANDRA-5064) Merged from 1.1: * fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061) * Fix ALTER TABLE overriding compression options with defaults http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9cb4c66..364565f 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -81,10 +81,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); /* - * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, + * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads. * - * There are two other things that maybeSwitchMemtable does. + * There are two other things that switchMemtable does. * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater * that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes @@ -157,34 +157,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean indexManager.reload(); // If the CF comparator has changed, we need to change the memtable, - // because the old one still aliases the previous comparator. We don't - // call forceFlush() because it can skip the switch if the memtable is - // clean, which we don't want here. Also, because there can be a race - // between the time we acquire the current memtable and we flush it - // (another thread can have flushed it first), we attempt the switch - // until we know the memtable has the current comparator. - try - { - while (true) - { - AbstractType comparator = metadata.comparator; - Memtable memtable = getMemtableThreadSafe(); - if (memtable.initialComparator == comparator) - break; - - Future future = maybeSwitchMemtable(getMemtableThreadSafe(), true); - if (future != null) - future.get(); - } - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + // because the old one still aliases the previous comparator. + if (getMemtableThreadSafe().initialComparator != metadata.comparator) + switchMemtable(true, true); } private void maybeReloadCompactionStrategy() @@ -610,15 +585,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return desc.filenameFor(Component.DATA); } - /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ - public Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog) + /** + * Switch and flush the current memtable, if it was dirty. The forceSwitch + * flag allow to force switching the memtable even if it is clean (though + * in that case we don't flush, as there is no point). + */ + public Future<?> switchMemtable(final boolean writeCommitLog, boolean forceSwitch) { - if (oldMemtable.isFrozen()) - { - logger.debug("memtable is already frozen; another thread must be flushing it"); - return null; - } - /* * If we can get the writelock, that means no new updates can come in and * all ongoing updates to memtables have completed. We can get the tail @@ -632,13 +605,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Table.switchLock.writeLock().lock(); try { - if (oldMemtable.isFrozen()) - { - logger.debug("memtable is already frozen; another thread must be flushing it"); - return null; - } - - assert getMemtableThreadSafe() == oldMemtable; final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE); // submit the memtable for any indexed sub-cfses, and our own. @@ -646,20 +612,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // don't assume that this.memtable is dirty; forceFlush can bring us here during index build even if it is not for (ColumnFamilyStore cfs : concatWithIndexes()) { - Memtable mt = cfs.getMemtableThreadSafe(); - if (!mt.isClean() && !mt.isFrozen()) - { - // We need to freeze indexes too because they can be concurrently flushed too (#3547) - mt.freeze(); + if (forceSwitch || !cfs.getMemtableThreadSafe().isClean()) icc.add(cfs); - } } + final CountDownLatch latch = new CountDownLatch(icc.size()); for (ColumnFamilyStore cfs : icc) { Memtable memtable = cfs.data.switchMemtable(); - logger.info("Enqueuing flush of {}", memtable); - memtable.flushAndSignal(latch, flushWriter, ctx); + // With forceSwitch it's possible to get a clean memtable here. + // In that case, since we've switched it already, just remove + // it from the memtable pending flush right away. + if (memtable.isClean()) + { + cfs.replaceFlushed(memtable, null); + latch.countDown(); + } + else + { + logger.info("Enqueuing flush of {}", memtable); + memtable.flushAndSignal(latch, flushWriter, ctx); + } } if (metric.memtableSwitchCount.count() == Long.MAX_VALUE) @@ -713,17 +686,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (clean) { logger.debug("forceFlush requested but everything is clean in {}", columnFamily); - return null; + return Futures.immediateCheckedFuture(null); } - return maybeSwitchMemtable(getMemtableThreadSafe(), true); + return switchMemtable(true, false); } public void forceBlockingFlush() throws ExecutionException, InterruptedException { - Future<?> future = forceFlush(); - if (future != null) - future.get(); + forceFlush().get(); } public void maybeUpdateRowCache(DecoratedKey key, ColumnFamily columnFamily) @@ -1046,16 +1017,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return (int) metric.memtableSwitchCount.count(); } - /** - * get the current memtable in a threadsafe fashion. note that simply "return memtable_" is - * incorrect; you need to lock to introduce a thread safe happens-before ordering. - * - * do NOT use this method to do either a put or get on the memtable object, since it could be - * flushed in the meantime (and its executor terminated). - * - * also do NOT make this method public or it will really get impossible to reason about these things. - * @return - */ private Memtable getMemtableThreadSafe() { return data.getMemtable(); @@ -1783,7 +1744,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (ColumnFamilyStore cfs : concatWithIndexes()) { Memtable mt = cfs.getMemtableThreadSafe(); - if (!mt.isClean() && !mt.isFrozen()) + if (!mt.isClean()) { mt.cfs.data.renewMemtable(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java index a9f6427..30614b7 100644 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@ -634,10 +634,7 @@ public class DefsTable private static void flushSchemaCF(String cfName) { - Future<?> flush = SystemTable.schemaCFS(cfName).forceFlush(); - - if (flush != null) - FBUtilities.waitOnFuture(flush); + FBUtilities.waitOnFuture(SystemTable.schemaCFS(cfName).forceFlush()); } private static ByteBuffer toUTF8Bytes(UUID version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 82d22ca..56e2bf5 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -81,7 +81,6 @@ public class Memtable volatile static Memtable activelyMeasuring; - private volatile boolean isFrozen; private final AtomicLong currentSize = new AtomicLong(0); private final AtomicLong currentOperations = new AtomicLong(0); @@ -141,16 +140,6 @@ public class Memtable return currentOperations.get(); } - boolean isFrozen() - { - return isFrozen; - } - - void freeze() - { - isFrozen = true; - } - /** * Should only be called by ColumnFamilyStore.apply. NOT a public API. * (CFS handles locking to avoid submitting an op @@ -158,7 +147,6 @@ public class Memtable */ void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer) { - assert !isFrozen; // not 100% foolproof but hell, it's an assert resolve(key, columnFamily, indexer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index 18b7e4b..bb5e6ee 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -450,11 +450,7 @@ public class Table { List<Future<?>> futures = new ArrayList<Future<?>>(); for (UUID cfId : columnFamilyStores.keySet()) - { - Future<?> future = columnFamilyStores.get(cfId).forceFlush(); - if (future != null) - futures.add(future); - } + futures.add(columnFamilyStores.get(cfId).forceFlush()); return futures; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 08dc5fc..5577680 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -479,11 +479,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe if (!ksm.durableWrites) { for (ColumnFamilyStore cfs : table.getColumnFamilyStores()) - { - Future<?> future = cfs.forceFlush(); - if (future != null) - flushes.add(future); - } + flushes.add(cfs.forceFlush()); } } FBUtilities.waitOnFutures(flushes); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/streaming/StreamOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java index 2ade0c6..7043be4 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOut.java +++ b/src/java/org/apache/cassandra/streaming/StreamOut.java @@ -93,11 +93,7 @@ public class StreamOut logger.info("Flushing memtables for {}...", stores); List<Future<?>> flushes = new ArrayList<Future<?>>(); for (ColumnFamilyStore cfstore : stores) - { - Future<?> flush = cfstore.forceFlush(); - if (flush != null) - flushes.add(flush); - } + flushes.add(cfstore.forceFlush()); FBUtilities.waitOnFutures(flushes); }