Rework tombstone purgeability checks to not use sentinel timestamp to indicate 
unconditional purgeability

patch by Joel Knighton; reviewed by Branimir Lambov for CASSANDRA-12792


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d2fdfeb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d2fdfeb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d2fdfeb

Branch: refs/heads/cassandra-3.0
Commit: 7d2fdfeb41eca9badaf10b906b6afe077d166348
Parents: 84b9e72
Author: Joel Knighton <joel.knigh...@datastax.com>
Authored: Thu Oct 20 22:51:51 2016 -0500
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Fri Nov 18 12:25:29 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionController.java     |  58 +++++---
 .../db/compaction/CompactionManager.java        |   6 +-
 .../db/compaction/LazilyCompactedRow.java       |  24 ++--
 .../db/compaction/SSTableSplitter.java          |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |   6 +-
 .../cassandra/db/compaction/Upgrader.java       |   6 +-
 .../cassandra/db/compaction/Verifier.java       |   6 +-
 .../db/compaction/CompactionControllerTest.java |  22 ++--
 .../db/compaction/CompactionsPurgeTest.java     | 131 +++++++++++++++++++
 10 files changed, 219 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3482052..54dc4b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction 
(CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using 
prepared statements (CASSANDRA-12863)
  * Fix Util.spinAssertEquals (CASSANDRA-12283)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 699bc55..e895573 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -186,36 +189,59 @@ public class CompactionController implements AutoCloseable
     }
 
     /**
-     * @return the largest timestamp before which it's okay to drop tombstones 
for the given partition;
-     * i.e., after the maxPurgeableTimestamp there may exist newer data that 
still needs to be suppressed
-     * in other sstables.  This returns the minimum timestamp for any SSTable 
that contains this partition and is not
-     * participating in this compaction, or memtable that contains this 
partition,
-     * or LONG.MAX_VALUE if no SSTable or memtable exist.
+     * @param key
+     * @return a predicate for whether tombstones marked for deletion at the 
given time for the given partition are
+     * purgeable; we calculate this by checking whether the deletion time is 
less than the min timestamp of all SSTables
+     * containing his partition and not participating in the compaction. This 
means there isn't any data in those
+     * sstables that might still need to be suppressed by a tombstone at this 
timestamp.
      */
-    public long maxPurgeableTimestamp(DecoratedKey key)
+    public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
     {
         if (NEVER_PURGE_TOMBSTONES)
-            return Long.MIN_VALUE;
+            return Predicates.alwaysFalse();
 
-        long min = Long.MAX_VALUE;
         overlapIterator.update(key);
-        for (SSTableReader sstable : overlapIterator.overlaps())
+        Set<SSTableReader> filteredSSTables = overlapIterator.overlaps();
+        Iterable<Memtable> memtables = 
cfs.getTracker().getView().getAllMemtables();
+        long minTimestampSeen = Long.MAX_VALUE;
+        boolean hasTimestamp = false;
+
+        for (SSTableReader sstable: filteredSSTables)
         {
             // if we don't have bloom filter(bf_fp_chance=1.0 or filter file 
is missing),
             // we check index file instead.
-            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && 
sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
-                min = Math.min(min, sstable.getMinTimestamp());
-            else if (sstable.getBloomFilter().isPresent(key))
-                min = Math.min(min, sstable.getMinTimestamp());
+            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && 
sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null
+                || sstable.getBloomFilter().isPresent(key))
+            {
+                minTimestampSeen = Math.min(minTimestampSeen, 
sstable.getMinTimestamp());
+                hasTimestamp = true;
+            }
+
         }
 
-        for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+        for (Memtable memtable : memtables)
         {
             ColumnFamily cf = memtable.getColumnFamily(key);
             if (cf != null)
-                min = Math.min(min, memtable.getMinTimestamp());
+            {
+                minTimestampSeen = Math.min(minTimestampSeen, 
memtable.getMinTimestamp());
+                hasTimestamp = true;
+            }
+        }
+
+        if (!hasTimestamp)
+            return Predicates.alwaysTrue();
+        else
+        {
+            final long finalTimestamp = minTimestampSeen;
+            return new Predicate<Long>()
+            {
+                public boolean apply(Long time)
+                {
+                    return time < finalTimestamp;
+                }
+            };
         }
-        return min;
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 626bd27..8a3c11e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -28,6 +28,8 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -1419,7 +1421,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
 
         @Override
-        public long maxPurgeableTimestamp(DecoratedKey key)
+        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
         {
             /*
              * The main reason we always purge is that including gcable 
tombstone would mean that the
@@ -1432,7 +1434,7 @@ public class CompactionManager implements 
CompactionManagerMBean
              * a tombstone that could shadow a column in another sstable, but 
this is doubly not a concern
              * since validation compaction is read-only.
              */
-            return Long.MAX_VALUE;
+            return Predicates.alwaysTrue();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 74865b2..eaceead 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
 
@@ -52,8 +53,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
 {
     protected final List<? extends OnDiskAtomIterator> rows;
     protected final CompactionController controller;
-    protected boolean hasCalculatedMaxPurgeableTimestamp = false;
-    protected long maxPurgeableTimestamp;
+    protected Predicate<Long> purgeEvaluator;
     protected final ColumnFamily emptyColumnFamily;
     protected ColumnStats columnStats;
     protected boolean closed;
@@ -82,25 +82,21 @@ public class LazilyCompactedRow extends AbstractCompactedRow
 
         emptyColumnFamily = 
ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
         emptyColumnFamily.delete(maxRowTombstone);
-        if (!maxRowTombstone.isLive() && maxRowTombstone.markedForDeleteAt < 
getMaxPurgeableTimestamp())
+        if (!maxRowTombstone.isLive() && 
getPurgeEvaluator().apply(maxRowTombstone.markedForDeleteAt))
             emptyColumnFamily.purgeTombstones(controller.gcBefore);
 
         reducer = new Reducer();
         merger = Iterators.filter(MergeIterator.get(rows, 
emptyColumnFamily.getComparator().onDiskAtomComparator(), reducer), 
Predicates.notNull());
     }
 
-    /**
-     * tombstones with a localDeletionTime before this can be purged.  This is 
the minimum timestamp for any sstable
-     * containing `key` outside of the set of sstables involved in this 
compaction.
-     */
-    private long getMaxPurgeableTimestamp()
+    private Predicate<Long> getPurgeEvaluator()
     {
-        if (!hasCalculatedMaxPurgeableTimestamp)
+        if (purgeEvaluator == null)
         {
-            hasCalculatedMaxPurgeableTimestamp = true;
-            maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
+            purgeEvaluator = controller.getPurgeEvaluator(key);
         }
-        return maxPurgeableTimestamp;
+
+        return purgeEvaluator;
     }
 
     private static void removeDeleted(ColumnFamily cf, boolean shouldPurge, 
DecoratedKey key, CompactionController controller)
@@ -291,7 +287,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < 
getMaxPurgeableTimestamp() ||
+                if (t.data.isGcAble(controller.gcBefore) && 
getPurgeEvaluator().apply(t.timestamp()) ||
                     maxRowTombstone.markedForDeleteAt >= t.timestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
@@ -314,7 +310,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 container.delete(maxRowTombstone);
                 Iterator<Cell> iter = container.iterator();
                 Cell c = iter.next();
-                boolean shouldPurge = c.getLocalDeletionTime() < 
Integer.MAX_VALUE && c.timestamp() < getMaxPurgeableTimestamp();
+                boolean shouldPurge = c.getLocalDeletionTime() < 
Integer.MAX_VALUE && getPurgeEvaluator().apply(c.timestamp());
                 removeDeleted(container, shouldPurge, key, controller);
                 iter = container.iterator();
                 if (!iter.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java 
b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index e9a4f05..6b302d2 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
@@ -94,9 +97,9 @@ public class SSTableSplitter {
         }
 
         @Override
-        public long maxPurgeableTimestamp(DecoratedKey key)
+        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
         {
-            return Long.MIN_VALUE;
+            return Predicates.alwaysFalse();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 99ee62e..aaed234 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -22,6 +22,8 @@ import java.io.*;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.collect.AbstractIterator;
 
@@ -495,9 +497,9 @@ public class Scrubber implements Closeable
         }
 
         @Override
-        public long maxPurgeableTimestamp(DecoratedKey key)
+        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
         {
-            return Long.MIN_VALUE;
+            return Predicates.alwaysFalse();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java 
b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index ca975b8..d6ef60e 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.compaction;
 import java.io.File;
 import java.util.*;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -118,9 +120,9 @@ public class Upgrader
         }
 
         @Override
-        public long maxPurgeableTimestamp(DecoratedKey key)
+        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
         {
-            return Long.MIN_VALUE;
+            return Predicates.alwaysFalse();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 0177819..42302fe 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
 import org.apache.cassandra.db.*;
@@ -276,9 +278,9 @@ public class Verifier implements Closeable
         }
 
         @Override
-        public long maxPurgeableTimestamp(DecoratedKey key)
+        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
         {
-            return Long.MIN_VALUE;
+            return Predicates.alwaysFalse();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 750a38e..3184159 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.compaction;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.Sets;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,6 +42,8 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.Util.cellname;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
 
 public class CompactionControllerTest extends SchemaLoader
@@ -80,10 +83,10 @@ public class CompactionControllerTest extends SchemaLoader
         // check max purgeable timestamp without any sstables
         try(CompactionController controller = new CompactionController(cfs, 
null, 0))
         {
-            assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); 
//memtable only
+            assertPurgeBoundary(controller.getPurgeEvaluator(key), 
timestamp1); //memtable only
 
             cfs.forceBlockingFlush();
-            assertEquals(Long.MAX_VALUE, 
controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+            
assertTrue(controller.getPurgeEvaluator(key).apply(Long.MAX_VALUE)); //no 
memtables and no sstables
         }
 
         Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // 
first sstable is compacting
@@ -95,11 +98,11 @@ public class CompactionControllerTest extends SchemaLoader
         // check max purgeable timestamp when compacting the first sstable 
with and without a memtable
         try (CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); 
//second sstable only
+            assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp2);
 
             applyMutation(CF1, rowKey, timestamp3);
 
-            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); 
//second sstable and second memtable
+            assertPurgeBoundary(controller.getPurgeEvaluator(key), 
timestamp3); //second sstable and second memtable
         }
 
         // check max purgeable timestamp again without any sstables but with 
different insertion orders on the memtable
@@ -112,7 +115,7 @@ public class CompactionControllerTest extends SchemaLoader
             applyMutation(CF1, rowKey, timestamp2);
             applyMutation(CF1, rowKey, timestamp3);
 
-            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); 
//memtable only
+            assertPurgeBoundary(controller.getPurgeEvaluator(key), 
timestamp3); //memtable only
         }
 
         cfs.forceBlockingFlush();
@@ -124,7 +127,7 @@ public class CompactionControllerTest extends SchemaLoader
             applyMutation(CF1, rowKey, timestamp2);
             applyMutation(CF1, rowKey, timestamp1);
 
-            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); 
//memtable only
+            assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3);
         }
     }
 
@@ -186,6 +189,9 @@ public class CompactionControllerTest extends SchemaLoader
         rm.applyUnsafe();
     }
 
-
-
+    private void assertPurgeBoundary(Predicate<Long> evaluator, long boundary)
+    {
+        assertFalse(evaluator.apply(boundary));
+        assertTrue(evaluator.apply(boundary - 1));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2fdfeb/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index e5baab6..4a1f2ca 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -134,6 +134,137 @@ public class CompactionsPurgeTest
     }
 
     @Test
+    public void testMajorCompactionPurgeTombstonesWithMaxTimestamp()
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key1");
+        Mutation rm;
+
+        // inserts
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(cfName, cellname(String.valueOf(i)), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // deletes
+        for (int i = 0; i < 10; i++)
+        {
+            rm = new Mutation(KEYSPACE1, key.getKey());
+            rm.delete(cfName, cellname(String.valueOf(i)), Long.MAX_VALUE);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+
+        // major compact - tombstones should be purged
+        
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, 
Integer.MAX_VALUE, false));
+
+        // resurrect one column
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        rm.add(cfName, cellname(String.valueOf(5)), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        cfs.invalidateCachedRow(key);
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, 
System.currentTimeMillis()));
+        assertColumns(cf, "5");
+        assert cf.getColumn(cellname(String.valueOf(5))) != null;
+    }
+
+    @Test
+    public void testMajorCompactionPurgeTopLevelTombstoneWithMaxTimestamp()
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key1");
+        Mutation rm;
+
+        // inserts
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(cfName, cellname(String.valueOf(i)), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // delete
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        rm.delete(cfName, Long.MAX_VALUE);
+        rm.apply();
+
+        cfs.forceBlockingFlush();
+
+        // major compact - tombstone should be purged
+        
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, 
Integer.MAX_VALUE, false));
+
+        // resurrect one column
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        rm.add(cfName, cellname(String.valueOf(5)), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        cfs.invalidateCachedRow(key);
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, 
System.currentTimeMillis()));
+        assertColumns(cf, "5");
+        assert cf.getColumn(cellname(String.valueOf(5))) != null;
+    }
+
+    @Test
+    public void testMajorCompactionPurgeRangeTombstoneWithMaxTimestamp()
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key1");
+        Mutation rm;
+
+        // inserts
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(cfName, cellname(String.valueOf(i)), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // delete
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        rm.deleteRange(cfName, cellname(String.valueOf(0)), 
cellname(String.valueOf(9)), Long.MAX_VALUE);
+        rm.apply();
+
+        cfs.forceBlockingFlush();
+
+        // major compact - tombstone should be purged
+        
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, 
Integer.MAX_VALUE, false));
+
+        // resurrect one column
+        rm = new Mutation(KEYSPACE1, key.getKey());
+        rm.add(cfName, cellname(String.valueOf(5)), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        cfs.invalidateCachedRow(key);
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, 
System.currentTimeMillis()));
+        assertColumns(cf, "5");
+        assert cf.getColumn(cellname(String.valueOf(5))) != null;
+    }
+
+    @Test
     public void testMinorCompactionPurge()
     {
         CompactionManager.instance.disableAutoCompaction();

Reply via email to