Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 b42a0cfe8 -> 6a1c1d900


Skip redundant tombstones on compaction.

Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-7953


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a61fc01f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a61fc01f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a61fc01f

Branch: refs/heads/cassandra-3.0
Commit: a61fc01f418426847e3aad133127da3615813236
Parents: 02f88e3
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Oct 7 14:46:24 2015 +0300
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Oct 15 15:28:42 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  32 +++--
 .../org/apache/cassandra/db/RangeTombstone.java | 135 ++++++++++---------
 .../cassandra/cql3/RangeTombstoneMergeTest.java | 125 +++++++++++++++++
 4 files changed, 218 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b16acb5..68b44ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Merge range tombstones during compaction (CASSANDRA-7953)
  * (cqlsh) Distinguish negative and positive infinity in output 
(CASSANDRA-10523)
  * (cqlsh) allow custom time_format for COPY TO (CASSANDRA-8970)
  * Don't allow startup if the node's rack has changed (CASSANDRA-10242)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index d9d6a9c..0ea5c87 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -180,14 +180,24 @@ public class ColumnIndex
                 firstColumn = column;
                 startPosition = endPosition;
                 // TODO: have that use the firstColumn as min + make sure we 
optimize that on read
-                endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, 
output, atomSerializer);
+                endPosition += 
tombstoneTracker.writeOpenedMarkers(firstColumn.name(), output, atomSerializer);
                 blockSize = 0; // We don't count repeated tombstone marker in 
the block size, to avoid a situation
                                // where we wouldn't make any progress because 
a block is filled by said marker
+
+                maybeWriteRowHeader();
             }
 
-            long size = atomSerializer.serializedSizeForSSTable(column);
-            endPosition += size;
-            blockSize += size;
+            if (tombstoneTracker.update(column, false))
+            {
+                long size = tombstoneTracker.writeUnwrittenTombstones(output, 
atomSerializer);
+                size += atomSerializer.serializedSizeForSSTable(column);
+                endPosition += size;
+                blockSize += size;
+
+                atomSerializer.serializeForSSTable(column, output);
+            }
+
+            lastColumn = column;
 
             // if we hit the column index size that we have to index after, go 
ahead and index it.
             if (blockSize >= DatabaseDescriptor.getColumnIndexSize())
@@ -197,14 +207,6 @@ public class ColumnIndex
                 firstColumn = null;
                 lastBlockClosing = column;
             }
-
-            maybeWriteRowHeader();
-            atomSerializer.serializeForSSTable(column, output);
-
-            // TODO: Should deal with removing unneeded tombstones
-            tombstoneTracker.update(column, false);
-
-            lastColumn = column;
         }
 
         private void maybeWriteRowHeader() throws IOException
@@ -216,12 +218,16 @@ public class ColumnIndex
             }
         }
 
-        public ColumnIndex build()
+        public ColumnIndex build() throws IOException
         {
             // all columns were GC'd after all
             if (lastColumn == null)
                 return ColumnIndex.EMPTY;
 
+            long size = tombstoneTracker.writeUnwrittenTombstones(output, 
atomSerializer);
+            endPosition += size;
+            blockSize += size;
+
             // the last column may have fallen on an index boundary already.  
if not, index it explicitly.
             if (result.columnsIndex.isEmpty() || lastBlockClosing != 
lastColumn)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java 
b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 590b005..4d22d48 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -24,6 +24,7 @@ import java.security.MessageDigest;
 import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.OnDiskAtom.Serializer;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
@@ -122,7 +123,12 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
         // never have to test the RTs start since it's always assumed to be 
less than what we have.
         // Also note that this will store expired RTs (#7810). Those will be 
of type ExpiredRangeTombstone and
         // will be ignored by writeOpenedMarker.
-        private final List<RangeTombstone> openedTombstones = new 
LinkedList<RangeTombstone>();
+        private final List<RangeTombstone> openedTombstones = new 
LinkedList<>();
+
+        // Holds tombstones that are processed but not yet written out. 
Delaying the write allows us to remove
+        // duplicate / completely covered tombstones.
+        // Sorted in open order (to be written in that order).
+        private final Set<RangeTombstone> unwrittenTombstones = new 
LinkedHashSet<>();
 
         // Total number of atoms written by writeOpenedMarker().
         private int atomCount;
@@ -146,54 +152,49 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
          * @return the total serialized size of said tombstones and write them 
to
          * {@code out} it if isn't null.
          */
-        public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutputPlus 
out, OnDiskAtom.Serializer atomSerializer) throws IOException
+        public long writeOpenedMarkers(Composite startPos, DataOutputPlus out, 
OnDiskAtom.Serializer atomSerializer) throws IOException
         {
             long size = 0;
-            if (openedTombstones.isEmpty())
-                return size;
-
-            /*
-             * Compute the markers that needs to be written at the beginning of
-             * this block. We need to write one if it is the more recent
-             * (opened) tombstone for at least some part of its range.
-             */
-            List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>();
-            outer:
-            for (RangeTombstone tombstone : openedTombstones)
-            {
-                // If the first column is outside the range, skip it (in case 
update() hasn't been called yet)
-                if (comparator.compare(firstColumn.name(), tombstone.max) > 0)
-                    continue;
 
-                if (tombstone instanceof ExpiredRangeTombstone)
+            for (RangeTombstone rt : openedTombstones)
+            {
+                if (rt instanceof ExpiredRangeTombstone || 
comparator.compare(rt.max, startPos) < 0)
                     continue;
 
-                RangeTombstone updated = new 
RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data);
-
-                Iterator<RangeTombstone> iter = toWrite.iterator();
-                while (iter.hasNext())
-                {
-                    RangeTombstone other = iter.next();
-                    if (other.supersedes(updated, comparator))
-                        break outer;
-                    if (updated.supersedes(other, comparator))
-                        iter.remove();
-                }
-                toWrite.add(tombstone);
+                size += writeTombstone(rt, out, atomSerializer);
             }
+            return size;
+        }
 
-            for (RangeTombstone tombstone : toWrite)
+        /**
+         * Writes out all tombstones that have been accepted after the 
previous call of this method.
+         * Tombstones are not written immediately to permit redundant ones to 
be skipped.
+         *
+         * @return the serialized size of written tombstones
+         */
+        public long writeUnwrittenTombstones(DataOutputPlus out, 
OnDiskAtom.Serializer atomSerializer) throws IOException
+        {
+            long size = 0;
+            for (RangeTombstone rt : unwrittenTombstones)
             {
-                size += atomSerializer.serializedSizeForSSTable(tombstone);
-                atomCount++;
-                if (out != null)
-                    atomSerializer.serializeForSSTable(tombstone, out);
+                size += writeTombstone(rt, out, atomSerializer);
             }
+            unwrittenTombstones.clear();
+            return size;
+        }
+
+        private long writeTombstone(RangeTombstone rt, DataOutputPlus out, 
OnDiskAtom.Serializer atomSerializer)
+                throws IOException
+        {
+            long size = atomSerializer.serializedSizeForSSTable(rt);
+            atomCount++;
+            if (out != null)
+                atomSerializer.serializeForSSTable(rt, out);
             return size;
         }
 
         /**
-         * The total number of atoms written by calls to the method {@link 
#writeOpenedMarker}.
+         * The total number of atoms written by calls to the above methods.
          */
         public int writtenAtom()
         {
@@ -210,7 +211,7 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
          * Note that this method should be called on *every* atom of a 
partition for
          * the tracker to work as efficiently as possible (#9486).
          */
-        public void update(OnDiskAtom atom, boolean isExpired)
+        public boolean update(OnDiskAtom atom, boolean isExpired)
         {
             // Get rid of now useless RTs
             ListIterator<RangeTombstone> iterator = 
openedTombstones.listIterator();
@@ -223,6 +224,8 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
                 if (comparator.compare(atom.name(), t.max) > 0)
                 {
                     iterator.remove();
+                    // The iterator may still be in the unwrittenTombstones 
list. That's ok, it still needs to be written
+                    // but it can't influence anything else.
                 }
                 else
                 {
@@ -237,8 +240,6 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
             if (atom instanceof RangeTombstone)
             {
                 RangeTombstone toAdd = (RangeTombstone)atom;
-                if (isExpired)
-                    toAdd = new ExpiredRangeTombstone(toAdd);
 
                 // We want to maintain openedTombstones in end bounds order so 
we find where to insert the new element
                 // and add it. While doing so, we also check if that new 
tombstone fully shadow or is fully shadowed
@@ -252,41 +253,51 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
                     {
                         // the new one covers more than the existing one. If 
the new one happens to also supersedes
                         // the existing one, remove the existing one. In any 
case, we're not done yet.
-                        if (toAdd.data.supersedes(existing.data))
+                        if (!existing.data.supersedes(toAdd.data))
+                        {
                             iterator.remove();
+                            // If the existing one starts at the same position 
as the new, it does not need to be written
+                            // (it won't have been yet).
+                            if (comparator.compare(toAdd.min, existing.min) == 
0)
+                                unwrittenTombstones.remove(existing);
+                        }
                     }
                     else
                     {
                         // the new one is included in the existing one. If the 
new one supersedes the existing one,
                         // then we add the new one (and if the new one ends 
like the existing one, we can actually remove
                         // the existing one), otherwise we can actually ignore 
it. In any case, we're done.
-                        if (toAdd.data.supersedes(existing.data))
+                        if (!toAdd.data.supersedes(existing.data))
+                            return false;
+
+                        if (cmp == 0)
                         {
-                            if (cmp == 0)
-                                iterator.set(toAdd);
-                            else
-                                insertBefore(toAdd, iterator);
+                            iterator.remove();
+                            // If the existing one starts at the same position 
as the new, it does not need to be written
+                            // (it won't have been yet).
+                            if (comparator.compare(toAdd.min, existing.min) == 
0)
+                                unwrittenTombstones.remove(existing);
                         }
-                        return;
+                        else
+                        {
+                            iterator.previous();
+                        }
+                        // Found the insert position for the new tombstone
+                        break;
                     }
                 }
-                // If we reach here, either we had no tombstones and the new 
one ends after all existing ones.
-                iterator.add(toAdd);
-            }
-        }
 
-        /**
-         * Adds the provided {@code tombstone} _before_ the last element 
returned by {@code iterator.next()}.
-         * <p>
-         * This method assumes that {@code iterator.next()} has been called 
prior to this method call, i.e. that
-         * {@code iterator.hasPrevious() == true}.
-         */
-        private static void insertBefore(RangeTombstone tombstone, 
ListIterator<RangeTombstone> iterator)
-        {
-            assert iterator.hasPrevious();
-            iterator.previous();
-            iterator.add(tombstone);
-            iterator.next();
+                if (isExpired)
+                    iterator.add(new ExpiredRangeTombstone(toAdd));
+                else
+                {
+                    iterator.add(toAdd);
+                    unwrittenTombstones.add(toAdd);
+                }
+                return false;
+            }
+            // Caller should write cell.
+            return true;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java 
b/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java
new file mode 100644
index 0000000..0460a16
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableReader;
+
+public class RangeTombstoneMergeTest extends CQLTester
+{
+    @Before
+    public void before() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                "  key text," +
+                "  column text," +
+                "  data text," +
+                "  extra text," +
+                "  PRIMARY KEY(key, column)" +
+                ");");
+
+        // If the sstable only contains tombstones during compaction it seems 
that the sstable either gets removed or isn't created (but that could probably 
be a separate JIRA issue).
+        execute("INSERT INTO %s (key, column, data) VALUES (?, ?, ?)", "1", 
"1", "1");
+    }
+
+    @Test
+    public void testEqualMerge() throws Throwable
+    {
+        addRemoveAndFlush();
+
+        for (int i=0; i<3; ++i)
+        {
+            addRemoveAndFlush();
+            compact();
+        }
+
+        assertOneTombstone();
+    }
+
+    @Test
+    public void testRangeMerge() throws Throwable
+    {
+        addRemoveAndFlush();
+
+        execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, 
?)", "1", "2", "2", "2");
+        execute("DELETE extra FROM %s WHERE key=? AND column=?", "1", "2");
+
+        flush();
+        compact();
+
+        execute("DELETE FROM %s WHERE key=? AND column=?", "1", "2");
+
+        flush();
+        compact();
+
+        assertOneTombstone();
+    }
+
+    void assertOneTombstone() throws Throwable
+    {
+        assertRows(execute("SELECT column FROM %s"),
+                   row("1"));
+        assertAllRows(row("1", "1", "1", null));
+
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+        ColumnFamily cf = cfs.getColumnFamily(Util.dk("1"), Composites.EMPTY, 
Composites.EMPTY, false, 100, System.currentTimeMillis());
+        assertTrue(cf.deletionInfo().hasRanges());
+        assertEquals(1, cf.deletionInfo().rangeCount());    // Ranges merged 
during CF construction
+
+        assertEquals(1, cfs.getSSTables().size());
+        SSTableReader reader = Iterables.get(cfs.getSSTables(), 0);
+        assertEquals(1, countTombstones(reader));           // See 
CASSANDRA-7953.
+    }
+
+    void addRemoveAndFlush() throws Throwable
+    {
+        execute("INSERT INTO %s (key, column, data) VALUES (?, ?, ?)", "1", 
"2", "2");
+        execute("DELETE FROM %s WHERE key=? AND column=?", "1", "2");
+        flush();
+    }
+
+    int countTombstones(SSTableReader reader)
+    {
+        int tombstones = 0;
+        ISSTableScanner partitions = reader.getScanner();
+        while (partitions.hasNext())
+        {
+            OnDiskAtomIterator iter = partitions.next();
+            while (iter.hasNext())
+            {
+                OnDiskAtom atom = iter.next();
+                if (atom instanceof RangeTombstone)
+                    ++tombstones;
+            }
+        }
+        return tombstones;
+    }
+}

Reply via email to