maxPurgeableTimestamp needs to check memtables too

Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7

Branch: refs/heads/cassandra-3.3
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java |  14 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  15 +-
 .../db/compaction/CompactionController.java     |  21 +-
 .../db/compaction/LazilyCompactedRow.java       |   2 +-
 .../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
 6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
  * Apply change to compaction throughput in real time (CASSANDRA-10025)
  * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
  * Avoid over-fetching during the page of range queries (CASSANDRA-8521)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java 
b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
      *
      * @return the difference in size seen after merging the given columns
      */
-    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, 
MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+    public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, 
MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, 
allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 {
                     indexer.updateRowLevelIndexes();
                     updater.finish();
-                    return Pair.create(updater.dataSize, 
updater.colUpdateTimeDelta);
+                    return updater;
                 }
                 else if (!monitorOwned)
                 {
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
     }
 
     // the function we provide to the btree utilities to perform any column 
replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
+    static final class ColumnUpdater implements UpdateFunction<Cell>
     {
         final AtomicBTreeColumns updating;
         final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         long colUpdateTimeDelta = Long.MAX_VALUE;
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Cell> inserted; // TODO: replace with walk of aborted BTree
+        long minTimestamp = Long.MAX_VALUE;
 
         private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData 
metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
         {
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(insert);
+            minTimestamp = Math.min(minTimestamp, insert.timestamp());
             return insert;
         }
 
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
         {
             Cell reconciled = existing.reconcile(update);
             indexer.update(existing, reconciled);
+            // pick the smallest timestamp because we want to be consistent 
with the logic applied when inserting
+            // a cell in apply(Cell insert) above. For example given 3 
timestamps where T3 < T2 < T1 then we want
+            // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> 
apply(T2) -> apply(T1)] to both return the
+            // smallest value T3, see 
CompactionControllerTest.testMaxPurgeableTimestamp()
+            minTimestamp = Math.min(minTimestamp, update.timestamp());
             if (existing != reconciled)
             {
                 reconciled = reconciled.localCopy(metadata, allocator, 
writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 inserted.clear();
             }
             reclaimer.cancel();
+            minTimestamp = Long.MAX_VALUE;
         }
 
         protected void abort(Cell abort)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
+    // The smallest timestamp for all partitions stored in this memtable
+    private long minTimestamp = Long.MAX_VALUE;
+
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, 
allocator, opGroup, indexer);
-        liveDataSize.addAndGet(initialSize + pair.left);
+        final AtomicBTreeColumns.ColumnUpdater updater = 
previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+        liveDataSize.addAndGet(initialSize + updater.dataSize);
         currentOperations.addAndGet(cf.getColumnCount() + 
(cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-        return pair.right;
+        return updater.colUpdateTimeDelta;
     }
 
     // for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
+    public long getMinTimestamp()
+    {
+        return minTimestamp;
+    }
+
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
      * Finds expired sstables
      *
      * works something like this;
-     * 1. find "global" minTimestamp of overlapping sstables and compacting 
sstables containing any non-expired data
+     * 1. find "global" minTimestamp of overlapping sstables, compacting 
sstables and memtables containing any non-expired data
      * 2. build a list of fully expired candidates
      * 3. check if the candidates to be dropped actually can be dropped 
(maxTimestamp < global minTimestamp)
      *    - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
                 minTimestamp = Math.min(minTimestamp, 
candidate.getMinTimestamp());
         }
 
+        for (Memtable memtable : 
cfStore.getTracker().getView().getAllMemtables())
+            minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
         // At this point, minTimestamp denotes the lowest timestamp of any 
relevant
-        // SSTable that contains a constructive value. candidates contains all 
the
+        // SSTable or Memtable that contains a constructive value. candidates 
contains all the
         // candidates with no constructive values. The ones out of these that 
have
         // (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
 
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
      * @return the largest timestamp before which it's okay to drop tombstones 
for the given partition;
      * i.e., after the maxPurgeableTimestamp there may exist newer data that 
still needs to be suppressed
      * in other sstables.  This returns the minimum timestamp for any SSTable 
that contains this partition and is not
-     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable 
exists.
+     * participating in this compaction, or memtable that contains this 
partition,
+     * or LONG.MAX_VALUE if no SSTable or memtable exist.
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
             else if (sstable.getBloomFilter().isPresent(key))
                 min = Math.min(min, sstable.getMinTimestamp());
         }
+
+        for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+        {
+            ColumnFamily cf = memtable.getColumnFamily(key);
+            if (cf != null)
+                min = Math.min(min, memtable.getMinTimestamp());
+        }
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.timestamp() < getMaxPurgeableTimestamp() && 
t.data.isGcAble(controller.gcBefore))
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < 
getMaxPurgeableTimestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "CompactionControllerTest";
+    private static final String CF1 = "Standard1";
+    private static final String CF2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
+    }
+
+    @Test
+    public void testMaxPurgeableTimestamp()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+        DecoratedKey key = 
DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // add to first memtable
+        applyMutation(CF1, rowKey, timestamp1);
+
+        // check max purgeable timestamp without any sstables
+        try(CompactionController controller = new CompactionController(cfs, 
null, 0))
+        {
+            assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); 
//memtable only
+
+            cfs.forceBlockingFlush();
+            assertEquals(Long.MAX_VALUE, 
controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+        }
+
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // 
first sstable is compacting
+
+        // create another sstable
+        applyMutation(CF1, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // check max purgeable timestamp when compacting the first sstable 
with and without a memtable
+        try (CompactionController controller = new CompactionController(cfs, 
compacting, 0))
+        {
+            assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); 
//second sstable only
+
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); 
//second sstable and second memtable
+        }
+
+        // check max purgeable timestamp again without any sstables but with 
different insertion orders on the memtable
+        cfs.forceBlockingFlush();
+
+        //newest to oldest
+        try (CompactionController controller = new CompactionController(cfs, 
null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp1);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); 
//memtable only
+        }
+
+        cfs.forceBlockingFlush();
+
+        //oldest to newest
+        try (CompactionController controller = new CompactionController(cfs, 
null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp3);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp1);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); 
//memtable only
+        }
+    }
+
+    @Test
+    public void testGetFullyExpiredSSTables()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // create sstable with tombstone that should be expired in no older 
timestamps
+        applyDeleteMutation(CF2, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // first sstable with tombstone is compacting
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+        // create another sstable with more recent timestamp
+        applyMutation(CF2, rowKey, timestamp1);
+        cfs.forceBlockingFlush();
+
+        // second sstable is overlapping
+        Set<SSTableReader> overlapping = 
Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+        // the first sstable should be expired because the overlapping sstable 
is newer and the gc period is later
+        int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+        Set<SSTableReader> expired = 
CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, 
gcBefore);
+        assertNotNull(expired);
+        assertEquals(1, expired.size());
+        assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+        // however if we add an older mutation to the memtable then the 
sstable should not be expired
+        applyMutation(CF2, rowKey, timestamp3);
+        expired = CompactionController.getFullyExpiredSSTables(cfs, 
compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(0, expired.size());
+    }
+
+    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        CellName colName = cellname("birthdate");
+        ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.add(cf, colName, val, timestamp);
+        rm.applyUnsafe();
+    }
+
+    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long 
timestamp)
+    {
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.delete(cf, timestamp);
+        rm.applyUnsafe();
+    }
+
+
+
+}

Reply via email to