markCompacting only succeeds if the exact SSTableReader instances being marked are in the live set
patch by benedict; reviewed by tjake for CASSANDRA-8689 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec7fba44 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec7fba44 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec7fba44 Branch: refs/heads/trunk Commit: ec7fba44e613815c472e6cd2744058ba324df056 Parents: f3c0e11 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Mar 4 16:06:37 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Mar 4 16:06:37 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../org/apache/cassandra/db/DataTracker.java | 75 ++++++++++-------- .../SizeTieredCompactionStrategy.java | 3 +- .../cassandra/io/sstable/SSTableRewriter.java | 2 +- .../unit/org/apache/cassandra/SchemaLoader.java | 4 +- .../io/sstable/IndexSummaryManagerTest.java | 82 +++++++++++++++++++- .../io/sstable/SSTableRewriterTest.java | 7 +- 8 files changed, 136 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3b373ae..52f33b3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,8 @@ * Show progress of streaming in nodetool netstats (CASSANDRA-8886) * IndexSummaryBuilder utilises offheap memory, and shares data between each IndexSummary opened from it (CASSANDRA-8757) + * markCompacting only succeeds if the exact SSTableReader instances being + marked are in the live set (CASSANDRA-8689) Merged from 2.0: * Add offline tool to relevel sstables (CASSANDRA-8301) * Preserve stream ID for more protocol errors (CASSANDRA-8848) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 38c5dbe..1f03090 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2616,7 +2616,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Iterable<SSTableReader> call() throws Exception { assert data.getCompacting().isEmpty() : data.getCompacting(); - Iterable<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); + Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); if (Iterables.isEmpty(sstables)) return Collections.emptyList(); boolean success = data.markCompacting(sstables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 81964f9..2c2053c 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -23,6 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.slf4j.Logger; @@ -191,30 +192,32 @@ public class DataTracker * unmarkCompacting, but since we will never call markObsolete on a sstable marked * as compacting (unless there is a serious bug), we can skip this. */ - public boolean markCompacting(Iterable<SSTableReader> sstables) + public boolean markCompacting(Collection<SSTableReader> sstables) + { + return markCompacting(sstables, false); + } + public boolean markCompacting(Collection<SSTableReader> sstables, boolean newTables) { assert sstables != null && !Iterables.isEmpty(sstables); while (true) { - View currentView = view.get(); - Set<SSTableReader> set = ImmutableSet.copyOf(sstables); - Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting); - if (inactive.size() < set.size()) + final View currentView = view.get(); + if (Iterables.any(sstables, Predicates.in(currentView.compacting))) return false; - if (Iterables.any(set, new Predicate<SSTableReader>() + Predicate live = new Predicate<SSTableReader>() { - @Override public boolean apply(SSTableReader sstable) { - return sstable.isMarkedCompacted(); + return currentView.sstablesMap.get(sstable) == sstable && !sstable.isMarkedCompacted(); } - })) - { + }; + if (newTables) + assert !Iterables.any(sstables, Predicates.in(currentView.sstables)); + else if (!Iterables.all(sstables, live)) return false; - } - View newView = currentView.markCompacting(set); + View newView = currentView.markCompacting(sstables); if (view.compareAndSet(currentView, newView)) return true; } @@ -376,12 +379,12 @@ public class DataTracker void init() { view.set(new View( - ImmutableList.of(new Memtable(cfstore)), - ImmutableList.<Memtable>of(), - Collections.<SSTableReader>emptySet(), - Collections.<SSTableReader>emptySet(), - Collections.<SSTableReader>emptySet(), - SSTableIntervalTree.empty())); + ImmutableList.of(new Memtable(cfstore)), + ImmutableList.<Memtable>of(), + Collections.<SSTableReader, SSTableReader>emptyMap(), + Collections.<SSTableReader>emptySet(), + Collections.<SSTableReader>emptySet(), + SSTableIntervalTree.empty())); } /** @@ -613,12 +616,17 @@ public class DataTracker private final List<Memtable> flushingMemtables; public final Set<SSTableReader> compacting; public final Set<SSTableReader> sstables; + // we use a Map here so that we can easily perform identity checks as well as equality checks. + // When marking compacting, we now indicate if we expect the sstables to be present (by default we do), + // and we then check that not only are they all present in the live set, but that the exact instance present is + // the one we made our decision to compact against. + public final Map<SSTableReader, SSTableReader> sstablesMap; // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable public final Set<SSTableReader> shadowed; public final SSTableIntervalTree intervalTree; - View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree) + View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree) { this.shadowed = shadowed; assert liveMemtables != null; @@ -629,7 +637,9 @@ public class DataTracker this.liveMemtables = liveMemtables; this.flushingMemtables = flushingMemtables; - this.sstables = sstables; + + this.sstablesMap = sstables; + this.sstables = sstablesMap.keySet(); this.compacting = compacting; this.intervalTree = intervalTree; } @@ -669,7 +679,7 @@ public class DataTracker View switchMemtable(Memtable newMemtable) { List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build(); - return new View(newLiveMemtables, flushingMemtables, sstables, compacting, shadowed, intervalTree); + return new View(newLiveMemtables, flushingMemtables, sstablesMap, compacting, shadowed, intervalTree); } View markFlushing(Memtable toFlushMemtable) @@ -696,7 +706,7 @@ public class DataTracker .addAll(flushing.subList(i, flushing.size())) .build(); - return new View(newLive, newFlushing, sstables, compacting, shadowed, intervalTree); + return new View(newLive, newFlushing, sstablesMap, compacting, shadowed, intervalTree); } View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable) @@ -706,14 +716,15 @@ public class DataTracker .addAll(flushingMemtables.subList(0, index)) .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size())) .build(); - Set<SSTableReader> newSSTables = sstables; + Map<SSTableReader, SSTableReader> newSSTables = sstablesMap; SSTableIntervalTree intervalTree = this.intervalTree; if (newSSTable != null) { assert !sstables.contains(newSSTable); assert !shadowed.contains(newSSTable); - newSSTables = ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build(); - intervalTree = buildIntervalTree(newSSTables); + newSSTables = ImmutableMap.<SSTableReader, SSTableReader>builder() + .putAll(sstablesMap).put(newSSTable, newSSTable).build(); + intervalTree = buildIntervalTree(newSSTables.keySet()); } return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree); } @@ -723,12 +734,12 @@ public class DataTracker ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables); int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements); assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this); - Set<SSTableReader> newSSTables = new HashSet<>(newSSTablesSize); + Map<SSTableReader, SSTableReader> newSSTables = new HashMap<>(newSSTablesSize); Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size()); for (SSTableReader sstable : sstables) if (!oldSet.contains(sstable)) - newSSTables.add(sstable); + newSSTables.put(sstable, sstable); for (SSTableReader sstable : shadowed) if (!oldSet.contains(sstable)) @@ -739,28 +750,28 @@ public class DataTracker if (replacement.openReason == SSTableReader.OpenReason.SHADOWED) newShadowed.add(replacement); else - newSSTables.add(replacement); + newSSTables.put(replacement, replacement); } assert newSSTables.size() + newShadowed.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this); - newSSTables = ImmutableSet.copyOf(newSSTables); newShadowed = ImmutableSet.copyOf(newShadowed); - SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables); + newSSTables = ImmutableMap.copyOf(newSSTables); + SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables.keySet()); return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree); } View markCompacting(Collection<SSTableReader> tomark) { Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build(); - return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree); + return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree); } View unmarkCompacting(Iterable<SSTableReader> tounmark) { Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark))); - return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree); + return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree); } private Set<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 19abd9c..93484e8 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -21,6 +21,7 @@ import java.util.*; import java.util.Map.Entry; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -196,7 +197,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); if (Iterables.isEmpty(filteredSSTables)) return null; - if (!cfs.getDataTracker().markCompacting(filteredSSTables)) + if (!cfs.getDataTracker().markCompacting(ImmutableList.copyOf(filteredSSTables))) return null; return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, filteredSSTables, gcBefore, false)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index be1085b..914ce1f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -351,7 +351,7 @@ public class SSTableRewriter } else { - dataTracker.markCompacting(Collections.singleton(replaceWith)); + dataTracker.markCompacting(Collections.singleton(replaceWith), true); toReplaceSet = Collections.emptySet(); } dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index ce65d5a..db1758f 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -189,7 +189,9 @@ public class SchemaLoader standardCFMD(ks1, "StandardLowIndexInterval").minIndexInterval(8) .maxIndexInterval(256) .caching(CachingOptions.NONE), - + standardCFMD(ks1, "StandardRace").minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingOptions.NONE), standardCFMD(ks1, "UUIDKeys").keyValidator(UUIDType.instance), CFMetaData.denseCFMetaData(ks1, "MixedTypes", LongType.instance).keyValidator(UUIDType.instance).defaultValidator(BooleanType.instance), CFMetaData.denseCFMetaData(ks1, "MixedTypesComposite", composite).keyValidator(composite).defaultValidator(BooleanType.instance), http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 0bb9d5f..dec7705 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -20,22 +20,27 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import junit.framework.Assert; +import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD; @@ -46,6 +51,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +@RunWith(OrderedJUnit4ClassRunner.class) public class IndexSummaryManagerTest extends SchemaLoader { private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManagerTest.class); @@ -83,7 +89,6 @@ public class IndexSummaryManagerTest extends SchemaLoader long total = 0; for (SSTableReader sstable : sstables) total += sstable.getIndexSummaryOffHeapSize(); - return total; } @@ -494,4 +499,73 @@ public class IndexSummaryManagerTest extends SchemaLoader assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval()); } } + + //This test runs last, since cleaning up compactions and tp is a pain + @Test + public void testCompactionRace() throws InterruptedException, ExecutionException + { + String ksname = "Keyspace1"; + String cfname = "StandardRace"; // index interval of 8, no key caching + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + int numSSTables = 20; + int numRows = 28; + createSSTables(ksname, cfname, numSSTables, numRows); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables()); + + ExecutorService tp = Executors.newFixedThreadPool(2); + + final AtomicBoolean failed = new AtomicBoolean(false); + + for (int i = 0; i < 2; i++) + { + tp.submit(new Runnable() + { + @Override + public void run() + { + while(!failed.get()) + { + try + { + IndexSummaryManager.instance.redistributeSummaries(); + } catch (Throwable e) + { + failed.set(true); + } + } + } + }); + } + + while ( cfs.getSSTables().size() != 1 ) + cfs.forceMajorCompaction(); + + try + { + Assert.assertFalse(failed.get()); + + for (SSTableReader sstable : sstables) + { + Assert.assertEquals(true, sstable.isMarkedCompacted()); + } + + Assert.assertEquals(20, sstables.size()); + + try + { + totalOffHeapSize(sstables); + Assert.fail("This should have failed"); + } catch (AssertionError e) + { + + } + } + finally + { + tp.shutdownNow(); + CompactionManager.instance.finishCompactionsAndShutdown(10, TimeUnit.SECONDS); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 258b6b5..6c96905 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -540,7 +540,7 @@ public class SSTableRewriterTest extends SchemaLoader cfs.truncateBlocking(); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 1000); - cfs.getDataTracker().markCompacting(Arrays.asList(s)); + cfs.getDataTracker().markCompacting(Arrays.asList(s), true); SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10); splitter.split(); Thread.sleep(1000); @@ -584,6 +584,7 @@ public class SSTableRewriterTest extends SchemaLoader if (!offline) cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); + cfs.getDataTracker().markCompacting(compacting); SSTableRewriter.overrideOpenInterval(10000000); SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline); SSTableWriter w = getWriter(cfs, s.descriptor.directory); @@ -608,6 +609,10 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.abort(); } } + finally + { + cfs.getDataTracker().unmarkCompacting(compacting); + } Thread.sleep(1000); int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); assertEquals(filecount, 1);