markCompacting only succeeds if the exact SSTableReader instances being
marked are in the live set

patch by benedict; reviewed by tjake for CASSANDRA-8689


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec7fba44
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec7fba44
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec7fba44

Branch: refs/heads/trunk
Commit: ec7fba44e613815c472e6cd2744058ba324df056
Parents: f3c0e11
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Wed Mar 4 16:06:37 2015 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Wed Mar 4 16:06:37 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  2 +-
 .../org/apache/cassandra/db/DataTracker.java    | 75 ++++++++++--------
 .../SizeTieredCompactionStrategy.java           |  3 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  4 +-
 .../io/sstable/IndexSummaryManagerTest.java     | 82 +++++++++++++++++++-
 .../io/sstable/SSTableRewriterTest.java         |  7 +-
 8 files changed, 136 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b373ae..52f33b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@
  * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
  * IndexSummaryBuilder utilises offheap memory, and shares data between
    each IndexSummary opened from it (CASSANDRA-8757)
+ * markCompacting only succeeds if the exact SSTableReader instances being 
+   marked are in the live set (CASSANDRA-8689)
 Merged from 2.0:
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 38c5dbe..1f03090 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2616,7 +2616,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             public Iterable<SSTableReader> call() throws Exception
             {
                 assert data.getCompacting().isEmpty() : data.getCompacting();
-                Iterable<SSTableReader> sstables = 
Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
+                Collection<SSTableReader> sstables = 
Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
                 if (Iterables.isEmpty(sstables))
                     return Collections.emptyList();
                 boolean success = data.markCompacting(sstables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index 81964f9..2c2053c 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
@@ -191,30 +192,32 @@ public class DataTracker
      * unmarkCompacting, but since we will never call markObsolete on a 
sstable marked
      * as compacting (unless there is a serious bug), we can skip this.
      */
-    public boolean markCompacting(Iterable<SSTableReader> sstables)
+    public boolean markCompacting(Collection<SSTableReader> sstables)
+    {
+        return markCompacting(sstables, false);
+    }
+    public boolean markCompacting(Collection<SSTableReader> sstables, boolean 
newTables)
     {
         assert sstables != null && !Iterables.isEmpty(sstables);
         while (true)
         {
-            View currentView = view.get();
-            Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
-            Set<SSTableReader> inactive = Sets.difference(set, 
currentView.compacting);
-            if (inactive.size() < set.size())
+            final View currentView = view.get();
+            if (Iterables.any(sstables, Predicates.in(currentView.compacting)))
                 return false;
 
-            if (Iterables.any(set, new Predicate<SSTableReader>()
+            Predicate live = new Predicate<SSTableReader>()
             {
-                @Override
                 public boolean apply(SSTableReader sstable)
                 {
-                    return sstable.isMarkedCompacted();
+                    return currentView.sstablesMap.get(sstable) == sstable && 
!sstable.isMarkedCompacted();
                 }
-            }))
-            {
+            };
+            if (newTables)
+                assert !Iterables.any(sstables, 
Predicates.in(currentView.sstables));
+            else if (!Iterables.all(sstables, live))
                 return false;
-            }
 
-            View newView = currentView.markCompacting(set);
+            View newView = currentView.markCompacting(sstables);
             if (view.compareAndSet(currentView, newView))
                 return true;
         }
@@ -376,12 +379,12 @@ public class DataTracker
     void init()
     {
         view.set(new View(
-                ImmutableList.of(new Memtable(cfstore)),
-                ImmutableList.<Memtable>of(),
-                Collections.<SSTableReader>emptySet(),
-                Collections.<SSTableReader>emptySet(),
-                Collections.<SSTableReader>emptySet(),
-                SSTableIntervalTree.empty()));
+                         ImmutableList.of(new Memtable(cfstore)),
+                         ImmutableList.<Memtable>of(),
+                         Collections.<SSTableReader, SSTableReader>emptyMap(),
+                         Collections.<SSTableReader>emptySet(),
+                         Collections.<SSTableReader>emptySet(),
+                         SSTableIntervalTree.empty()));
     }
 
     /**
@@ -613,12 +616,17 @@ public class DataTracker
         private final List<Memtable> flushingMemtables;
         public final Set<SSTableReader> compacting;
         public final Set<SSTableReader> sstables;
+        // we use a Map here so that we can easily perform identity checks as 
well as equality checks.
+        // When marking compacting, we now  indicate if we expect the sstables 
to be present (by default we do),
+        // and we then check that not only are they all present in the live 
set, but that the exact instance present is
+        // the one we made our decision to compact against.
+        public final Map<SSTableReader, SSTableReader> sstablesMap;
 
         // all sstables that are still in the live set, but have been 
completely shadowed by a replacement sstable
         public final Set<SSTableReader> shadowed;
         public final SSTableIntervalTree intervalTree;
 
-        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, 
Set<SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> 
shadowed, SSTableIntervalTree intervalTree)
+        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, 
Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, 
Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
         {
             this.shadowed = shadowed;
             assert liveMemtables != null;
@@ -629,7 +637,9 @@ public class DataTracker
 
             this.liveMemtables = liveMemtables;
             this.flushingMemtables = flushingMemtables;
-            this.sstables = sstables;
+
+            this.sstablesMap = sstables;
+            this.sstables = sstablesMap.keySet();
             this.compacting = compacting;
             this.intervalTree = intervalTree;
         }
@@ -669,7 +679,7 @@ public class DataTracker
         View switchMemtable(Memtable newMemtable)
         {
             List<Memtable> newLiveMemtables = 
ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
-            return new View(newLiveMemtables, flushingMemtables, sstables, 
compacting, shadowed, intervalTree);
+            return new View(newLiveMemtables, flushingMemtables, sstablesMap, 
compacting, shadowed, intervalTree);
         }
 
         View markFlushing(Memtable toFlushMemtable)
@@ -696,7 +706,7 @@ public class DataTracker
                                                       
.addAll(flushing.subList(i, flushing.size()))
                                                       .build();
 
-            return new View(newLive, newFlushing, sstables, compacting, 
shadowed, intervalTree);
+            return new View(newLive, newFlushing, sstablesMap, compacting, 
shadowed, intervalTree);
         }
 
         View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
@@ -706,14 +716,15 @@ public class DataTracker
                                                              
.addAll(flushingMemtables.subList(0, index))
                                                              
.addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
                                                              .build();
-            Set<SSTableReader> newSSTables = sstables;
+            Map<SSTableReader, SSTableReader> newSSTables = sstablesMap;
             SSTableIntervalTree intervalTree = this.intervalTree;
             if (newSSTable != null)
             {
                 assert !sstables.contains(newSSTable);
                 assert !shadowed.contains(newSSTable);
-                newSSTables = 
ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
-                intervalTree = buildIntervalTree(newSSTables);
+                newSSTables = ImmutableMap.<SSTableReader, 
SSTableReader>builder()
+                                          .putAll(sstablesMap).put(newSSTable, 
newSSTable).build();
+                intervalTree = buildIntervalTree(newSSTables.keySet());
             }
             return new View(liveMemtables, newQueuedMemtables, newSSTables, 
compacting, shadowed, intervalTree);
         }
@@ -723,12 +734,12 @@ public class DataTracker
             ImmutableSet<SSTableReader> oldSet = 
ImmutableSet.copyOf(oldSSTables);
             int newSSTablesSize = shadowed.size() + sstables.size() - 
oldSSTables.size() + Iterables.size(replacements);
             assert newSSTablesSize >= Iterables.size(replacements) : 
String.format("Incoherent new size %d replacing %s by %s in %s", 
newSSTablesSize, oldSSTables, replacements, this);
-            Set<SSTableReader> newSSTables = new HashSet<>(newSSTablesSize);
+            Map<SSTableReader, SSTableReader> newSSTables = new 
HashMap<>(newSSTablesSize);
             Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size());
 
             for (SSTableReader sstable : sstables)
                 if (!oldSet.contains(sstable))
-                    newSSTables.add(sstable);
+                    newSSTables.put(sstable, sstable);
 
             for (SSTableReader sstable : shadowed)
                 if (!oldSet.contains(sstable))
@@ -739,28 +750,28 @@ public class DataTracker
                 if (replacement.openReason == 
SSTableReader.OpenReason.SHADOWED)
                     newShadowed.add(replacement);
                 else
-                    newSSTables.add(replacement);
+                    newSSTables.put(replacement, replacement);
             }
 
             assert newSSTables.size() + newShadowed.size() == newSSTablesSize :
                 String.format("Expecting new size of %d, got %d while 
replacing %s by %s in %s",
                           newSSTablesSize, newSSTables.size() + 
newShadowed.size(), oldSSTables, replacements, this);
-            newSSTables = ImmutableSet.copyOf(newSSTables);
             newShadowed = ImmutableSet.copyOf(newShadowed);
-            SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
+            newSSTables = ImmutableMap.copyOf(newSSTables);
+            SSTableIntervalTree intervalTree = 
buildIntervalTree(newSSTables.keySet());
             return new View(liveMemtables, flushingMemtables, newSSTables, 
compacting, newShadowed, intervalTree);
         }
 
         View markCompacting(Collection<SSTableReader> tomark)
         {
             Set<SSTableReader> compactingNew = 
ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
-            return new View(liveMemtables, flushingMemtables, sstables, 
compactingNew, shadowed, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstablesMap, 
compactingNew, shadowed, intervalTree);
         }
 
         View unmarkCompacting(Iterable<SSTableReader> tounmark)
         {
             Set<SSTableReader> compactingNew = 
ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
-            return new View(liveMemtables, flushingMemtables, sstables, 
compactingNew, shadowed, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstablesMap, 
compactingNew, shadowed, intervalTree);
         }
 
         private Set<SSTableReader> newSSTables(Collection<SSTableReader> 
oldSSTables, Iterable<SSTableReader> replacements)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 19abd9c..93484e8 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -196,7 +197,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         Iterable<SSTableReader> filteredSSTables = 
filterSuspectSSTables(sstables);
         if (Iterables.isEmpty(filteredSSTables))
             return null;
-        if (!cfs.getDataTracker().markCompacting(filteredSSTables))
+        if 
(!cfs.getDataTracker().markCompacting(ImmutableList.copyOf(filteredSSTables)))
             return null;
         return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, 
filteredSSTables, gcBefore, false));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index be1085b..914ce1f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -351,7 +351,7 @@ public class SSTableRewriter
         }
         else
         {
-            dataTracker.markCompacting(Collections.singleton(replaceWith));
+            dataTracker.markCompacting(Collections.singleton(replaceWith), 
true);
             toReplaceSet = Collections.emptySet();
         }
         dataTracker.replaceEarlyOpenedFiles(toReplaceSet, 
Collections.singleton(replaceWith));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java 
b/test/unit/org/apache/cassandra/SchemaLoader.java
index ce65d5a..db1758f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -189,7 +189,9 @@ public class SchemaLoader
                                            standardCFMD(ks1, 
"StandardLowIndexInterval").minIndexInterval(8)
                                                                                
         .maxIndexInterval(256)
                                                                                
         .caching(CachingOptions.NONE),
-
+                                           standardCFMD(ks1, 
"StandardRace").minIndexInterval(8)
+                                                                            
.maxIndexInterval(256)
+                                                                            
.caching(CachingOptions.NONE),
                                            standardCFMD(ks1, 
"UUIDKeys").keyValidator(UUIDType.instance),
                                            CFMetaData.denseCFMetaData(ks1, 
"MixedTypes", 
LongType.instance).keyValidator(UUIDType.instance).defaultValidator(BooleanType.instance),
                                            CFMetaData.denseCFMetaData(ks1, 
"MixedTypesComposite", 
composite).keyValidator(composite).defaultValidator(BooleanType.instance),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 0bb9d5f..dec7705 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -20,22 +20,27 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import junit.framework.Assert;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
@@ -46,6 +51,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(OrderedJUnit4ClassRunner.class)
 public class IndexSummaryManagerTest extends SchemaLoader
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IndexSummaryManagerTest.class);
@@ -83,7 +89,6 @@ public class IndexSummaryManagerTest extends SchemaLoader
         long total = 0;
         for (SSTableReader sstable : sstables)
             total += sstable.getIndexSummaryOffHeapSize();
-
         return total;
     }
 
@@ -494,4 +499,73 @@ public class IndexSummaryManagerTest extends SchemaLoader
                 assertTrue(entry.getValue() >= 
cfs.metadata.getMinIndexInterval());
         }
     }
+
+    //This test runs last, since cleaning up compactions and tp is a pain
+    @Test
+    public void testCompactionRace() throws InterruptedException, 
ExecutionException
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardRace"; // index interval of 8, no key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        int numSSTables = 20;
+        int numRows = 28;
+        createSSTables(ksname, cfname, numSSTables, numRows);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+
+        ExecutorService tp = Executors.newFixedThreadPool(2);
+
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        for (int i = 0; i < 2; i++)
+        {
+            tp.submit(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    while(!failed.get())
+                    {
+                        try
+                        {
+                            
IndexSummaryManager.instance.redistributeSummaries();
+                        } catch (Throwable e)
+                        {
+                            failed.set(true);
+                        }
+                    }
+                }
+            });
+        }
+
+        while ( cfs.getSSTables().size() != 1 )
+            cfs.forceMajorCompaction();
+
+        try
+        {
+            Assert.assertFalse(failed.get());
+
+            for (SSTableReader sstable : sstables)
+            {
+                Assert.assertEquals(true, sstable.isMarkedCompacted());
+            }
+
+            Assert.assertEquals(20, sstables.size());
+
+            try
+            {
+                totalOffHeapSize(sstables);
+                Assert.fail("This should have failed");
+            } catch (AssertionError e)
+            {
+
+            }
+        }
+        finally
+        {
+            tp.shutdownNow();
+            CompactionManager.instance.finishCompactionsAndShutdown(10, 
TimeUnit.SECONDS);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 258b6b5..6c96905 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -540,7 +540,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.truncateBlocking();
         cfs.disableAutoCompaction();
         SSTableReader s = writeFile(cfs, 1000);
-        cfs.getDataTracker().markCompacting(Arrays.asList(s));
+        cfs.getDataTracker().markCompacting(Arrays.asList(s), true);
         SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10);
         splitter.split();
         Thread.sleep(1000);
@@ -584,6 +584,7 @@ public class SSTableRewriterTest extends SchemaLoader
         if (!offline)
             cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
+        cfs.getDataTracker().markCompacting(compacting);
         SSTableRewriter.overrideOpenInterval(10000000);
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
offline);
         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
@@ -608,6 +609,10 @@ public class SSTableRewriterTest extends SchemaLoader
                 rewriter.abort();
             }
         }
+        finally
+        {
+            cfs.getDataTracker().unmarkCompacting(compacting);
+        }
         Thread.sleep(1000);
         int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
         assertEquals(filecount, 1);

Reply via email to