Add unit test for CASSANDRA-11548

Patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11548


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/209ebd38
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/209ebd38
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/209ebd38

Branch: refs/heads/cassandra-3.0
Commit: 209ebd380b641c4f065e9687186f546f8a50b242
Parents: d200d13
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Mon Apr 18 18:44:07 2016 -0300
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Apr 19 15:42:36 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +-
 .../org/apache/cassandra/db/DataTracker.java    |  12 +++
 .../SSTableCompactingNotification.java          |  41 ++++++++
 .../LongLeveledCompactionStrategyTest.java      | 101 +++++++++++++++++++
 4 files changed, 155 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 90a4f23..76d3673 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,5 @@
-2.1.15
- * Replace sstables on DataTracker before marking them as non-compacting 
during anti-compaction (CASSANDRA-11548)
-
 2.1.14
+ * Replace sstables on DataTracker before marking them as non-compacting 
during anti-compaction (CASSANDRA-11548)
  * Checking if an unlogged batch is local is inefficient (CASSANDRA-11529)
  * Fix paging for COMPACT tables without clustering columns (CASSANDRA-11467)
  * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index ef25236..c731a35 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -222,7 +222,10 @@ public class DataTracker
 
             View newView = currentView.markCompacting(sstables);
             if (view.compareAndSet(currentView, newView))
+            {
+                notifyCompacting(sstables, true);
                 return true;
+            }
         }
     }
 
@@ -247,6 +250,8 @@ public class DataTracker
             // interrupted after the CFS is invalidated, those sstables need 
to be unreferenced as well, so we do that here.
             unreferenceSSTables();
         }
+
+        notifyCompacting(unmark, false);
     }
 
     public void markObsolete(Collection<SSTableReader> sstables, OperationType 
compactionType)
@@ -511,6 +516,13 @@ public class DataTracker
             subscriber.handleNotification(notification, this);
     }
 
+    public void notifyCompacting(Iterable<SSTableReader> reader, boolean 
compacting)
+    {
+        INotification notification = new SSTableCompactingNotification(reader, 
compacting);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+    }
+
     public void notifyAdded(SSTableReader added)
     {
         INotification notification = new SSTableAddedNotification(added);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java
 
b/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java
new file mode 100644
index 0000000..6eddf3f
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+
+public class SSTableCompactingNotification implements INotification
+{
+    public final Iterable<SSTableReader> sstables;
+    public final boolean compacting;
+
+    public SSTableCompactingNotification(Iterable<SSTableReader> sstables, 
boolean compacting)
+    {
+        this.sstables = sstables;
+        this.compacting = compacting;
+    }
+
+    public String toString()
+    {
+        return "SSTableCompactingNotification{" +
+               "sstables=" + sstables +
+               ", compacting=" + compacting +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 0eb769f..fa6a31b 100644
--- 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.db.compaction;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.*;
@@ -28,9 +31,19 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableCompactingNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class LongLeveledCompactionStrategyTest extends SchemaLoader
@@ -125,4 +138,92 @@ public class LongLeveledCompactionStrategyTest extends 
SchemaLoader
             }
         }
     }
+
+    class CheckThatSSTableIsReleasedOnlyAfterCompactionFinishes implements 
INotificationConsumer
+    {
+        public final Set<SSTableReader> finishedCompaction = new HashSet<>();
+
+        boolean failed = false;
+
+        public void handleNotification(INotification received, Object sender)
+        {
+            if (received instanceof SSTableCompactingNotification)
+            {
+                SSTableCompactingNotification notification = 
(SSTableCompactingNotification) received;
+                if (!notification.compacting)
+                {
+                    for (SSTableReader reader : notification.sstables)
+                    {
+                        finishedCompaction.add(reader);
+                    }
+                }
+            }
+            if (received instanceof SSTableListChangedNotification)
+            {
+                SSTableListChangedNotification notification = 
(SSTableListChangedNotification) received;
+                for (SSTableReader reader : notification.removed)
+                {
+                    if (finishedCompaction.contains(reader))
+                        failed = true;
+                }
+            }
+        }
+
+        boolean isFailed()
+        {
+            return failed;
+        }
+    }
+
+    @Test
+    public void testAntiCompactionAfterLCS() throws Exception
+    {
+        testParallelLeveledCompaction();
+
+        String ksname = "Keyspace1";
+        String cfname = "StandardLeveled";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname);
+        WrappingCompactionStrategy strategy = ((WrappingCompactionStrategy) 
store.getCompactionStrategy());
+
+        Collection<SSTableReader> initialSSTables = 
store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), initialSSTables.size());
+
+        CheckThatSSTableIsReleasedOnlyAfterCompactionFinishes checker = new 
CheckThatSSTableIsReleasedOnlyAfterCompactionFinishes();
+        store.getDataTracker().subscribe(checker);
+
+        //anti-compact a subset of sstables
+        Range<Token> range = new Range<Token>(new 
BytesToken("110".getBytes()), new BytesToken("111".getBytes()), 
store.partitioner);
+        List<Range<Token>> ranges = Arrays.asList(range);
+        Refs<SSTableReader> refs = Refs.tryRef(initialSSTables);
+        if (refs == null)
+            throw new IllegalStateException();
+        long repairedAt = 1000;
+        CompactionManager.instance.performAnticompaction(store, ranges, refs, 
repairedAt);
+
+        //check that sstables were released only after compaction finished
+        assertFalse("Anti-compaction released sstable from compacting set 
before compaction was finished",
+                    checker.isFailed());
+
+        //check there is only one global ref count
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.selfRef().globalCount());
+        }
+
+        //check that compacting status was clearedd in all sstables
+        assertEquals(0, store.getDataTracker().getCompacting().size());
+
+        //make sure readers were replaced correctly on unrepaired leveled 
manifest after anti-compaction
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) 
strategy.getWrappedStrategies().get(1);
+        for (SSTableReader reader : initialSSTables)
+        {
+            Range<Token> sstableRange = new 
Range<Token>(reader.first.getToken(), reader.last.getToken());
+            if (sstableRange.intersects(range))
+            {
+                
assertFalse(lcs.manifest.generations[reader.getSSTableLevel()].contains(reader));
+            }
+        }
+    }
 }

Reply via email to